http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/HadoopUtil.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/HadoopUtil.java 
b/mr/src/main/java/org/apache/mahout/common/HadoopUtil.java
new file mode 100644
index 0000000..f693821
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/HadoopUtil.java
@@ -0,0 +1,442 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.io.Closeables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
+import 
org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterator;
+import 
org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class HadoopUtil {
+
+  private static final Logger log = LoggerFactory.getLogger(HadoopUtil.class);
+
+  private HadoopUtil() { }
+
+  /**
+   * Create a map-only Hadoop Job out of the passed in parameters.  Does not 
set the
+   * Job name.
+   *
+   * @see #getCustomJobName(String, org.apache.hadoop.mapreduce.JobContext, 
Class, Class)
+   */
+  public static Job prepareJob(Path inputPath,
+                           Path outputPath,
+                           Class<? extends InputFormat> inputFormat,
+                           Class<? extends Mapper> mapper,
+                           Class<? extends Writable> mapperKey,
+                           Class<? extends Writable> mapperValue,
+                           Class<? extends OutputFormat> outputFormat, 
Configuration conf) throws IOException {
+
+    Job job = new Job(new Configuration(conf));
+    Configuration jobConf = job.getConfiguration();
+
+    if (mapper.equals(Mapper.class)) {
+      throw new IllegalStateException("Can't figure out the user class jar 
file from mapper/reducer");
+    }
+    job.setJarByClass(mapper);
+
+    job.setInputFormatClass(inputFormat);
+    jobConf.set("mapred.input.dir", inputPath.toString());
+
+    job.setMapperClass(mapper);
+    job.setMapOutputKeyClass(mapperKey);
+    job.setMapOutputValueClass(mapperValue);
+    job.setOutputKeyClass(mapperKey);
+    job.setOutputValueClass(mapperValue);
+    jobConf.setBoolean("mapred.compress.map.output", true);
+    job.setNumReduceTasks(0);
+
+    job.setOutputFormatClass(outputFormat);
+    jobConf.set("mapred.output.dir", outputPath.toString());
+
+    return job;
+  }
+
+  /**
+   * Create a map and reduce Hadoop job.  Does not set the name on the job.
+   * @param inputPath The input {@link org.apache.hadoop.fs.Path}
+   * @param outputPath The output {@link org.apache.hadoop.fs.Path}
+   * @param inputFormat The {@link org.apache.hadoop.mapreduce.InputFormat}
+   * @param mapper The {@link org.apache.hadoop.mapreduce.Mapper} class to use
+   * @param mapperKey The {@link org.apache.hadoop.io.Writable} key class.  If 
the Mapper is a no-op,
+   *                  this value may be null
+   * @param mapperValue The {@link org.apache.hadoop.io.Writable} value class. 
 If the Mapper is a no-op,
+   *                    this value may be null
+   * @param reducer The {@link org.apache.hadoop.mapreduce.Reducer} to use
+   * @param reducerKey The reducer key class.
+   * @param reducerValue The reducer value class.
+   * @param outputFormat The {@link org.apache.hadoop.mapreduce.OutputFormat}.
+   * @param conf The {@link org.apache.hadoop.conf.Configuration} to use.
+   * @return The {@link org.apache.hadoop.mapreduce.Job}.
+   * @throws IOException if there is a problem with the IO.
+   *
+   * @see #getCustomJobName(String, org.apache.hadoop.mapreduce.JobContext, 
Class, Class)
+   * @see #prepareJob(org.apache.hadoop.fs.Path, org.apache.hadoop.fs.Path, 
Class, Class, Class, Class, Class,
+   * org.apache.hadoop.conf.Configuration)
+   */
+  public static Job prepareJob(Path inputPath,
+                           Path outputPath,
+                           Class<? extends InputFormat> inputFormat,
+                           Class<? extends Mapper> mapper,
+                           Class<? extends Writable> mapperKey,
+                           Class<? extends Writable> mapperValue,
+                           Class<? extends Reducer> reducer,
+                           Class<? extends Writable> reducerKey,
+                           Class<? extends Writable> reducerValue,
+                           Class<? extends OutputFormat> outputFormat,
+                           Configuration conf) throws IOException {
+
+    Job job = new Job(new Configuration(conf));
+    Configuration jobConf = job.getConfiguration();
+
+    if (reducer.equals(Reducer.class)) {
+      if (mapper.equals(Mapper.class)) {
+        throw new IllegalStateException("Can't figure out the user class jar 
file from mapper/reducer");
+      }
+      job.setJarByClass(mapper);
+    } else {
+      job.setJarByClass(reducer);
+    }
+
+    job.setInputFormatClass(inputFormat);
+    jobConf.set("mapred.input.dir", inputPath.toString());
+
+    job.setMapperClass(mapper);
+    if (mapperKey != null) {
+      job.setMapOutputKeyClass(mapperKey);
+    }
+    if (mapperValue != null) {
+      job.setMapOutputValueClass(mapperValue);
+    }
+
+    jobConf.setBoolean("mapred.compress.map.output", true);
+
+    job.setReducerClass(reducer);
+    job.setOutputKeyClass(reducerKey);
+    job.setOutputValueClass(reducerValue);
+
+    job.setOutputFormatClass(outputFormat);
+    jobConf.set("mapred.output.dir", outputPath.toString());
+
+    return job;
+  }
+
+
+  public static String getCustomJobName(String className, JobContext job,
+                                  Class<? extends Mapper> mapper,
+                                  Class<? extends Reducer> reducer) {
+    StringBuilder name = new StringBuilder(100);
+    String customJobName = job.getJobName();
+    if (customJobName == null || customJobName.trim().isEmpty()) {
+      name.append(className);
+    } else {
+      name.append(customJobName);
+    }
+    name.append('-').append(mapper.getSimpleName());
+    name.append('-').append(reducer.getSimpleName());
+    return name.toString();
+  }
+
+
+  public static void delete(Configuration conf, Iterable<Path> paths) throws 
IOException {
+    if (conf == null) {
+      conf = new Configuration();
+    }
+    for (Path path : paths) {
+      FileSystem fs = path.getFileSystem(conf);
+      if (fs.exists(path)) {
+        log.info("Deleting {}", path);
+        fs.delete(path, true);
+      }
+    }
+  }
+
+  public static void delete(Configuration conf, Path... paths) throws 
IOException {
+    delete(conf, Arrays.asList(paths));
+  }
+
+  public static long countRecords(Path path, Configuration conf) throws 
IOException {
+    long count = 0;
+    Iterator<?> iterator = new SequenceFileValueIterator<Writable>(path, true, 
conf);
+    while (iterator.hasNext()) {
+      iterator.next();
+      count++;
+    }
+    return count;
+  }
+
+  /**
+   * Count all the records in a directory using a
+   * {@link 
org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterator}
+   *
+   * @param path The {@link org.apache.hadoop.fs.Path} to count
+   * @param pt The {@link 
org.apache.mahout.common.iterator.sequencefile.PathType}
+   * @param filter Apply the {@link org.apache.hadoop.fs.PathFilter}.  May be 
null
+   * @param conf The Hadoop {@link org.apache.hadoop.conf.Configuration}
+   * @return The number of records
+   * @throws IOException if there was an IO error
+   */
+  public static long countRecords(Path path, PathType pt, PathFilter filter, 
Configuration conf) throws IOException {
+    long count = 0;
+    Iterator<?> iterator = new SequenceFileDirValueIterator<Writable>(path, 
pt, filter, null, true, conf);
+    while (iterator.hasNext()) {
+      iterator.next();
+      count++;
+    }
+    return count;
+  }
+
+  public static InputStream openStream(Path path, Configuration conf) throws 
IOException {
+    FileSystem fs = FileSystem.get(path.toUri(), conf);
+    return fs.open(path.makeQualified(path.toUri(), path));
+  }
+
+  public static FileStatus[] getFileStatus(Path path, PathType pathType, 
PathFilter filter,
+      Comparator<FileStatus> ordering, Configuration conf) throws IOException {
+    FileStatus[] statuses;
+    FileSystem fs = path.getFileSystem(conf);
+    if (filter == null) {
+      statuses = pathType == PathType.GLOB ? fs.globStatus(path) : 
listStatus(fs, path);
+    } else {
+      statuses = pathType == PathType.GLOB ? fs.globStatus(path, filter) : 
listStatus(fs, path, filter);
+    }
+    if (ordering != null) {
+      Arrays.sort(statuses, ordering);
+    }
+    return statuses;
+  }
+
+  public static FileStatus[] listStatus(FileSystem fs, Path path) throws 
IOException {
+    try {
+      return fs.listStatus(path);
+    } catch (FileNotFoundException e) {
+      return new FileStatus[0];
+    }
+  }
+
+  public static FileStatus[] listStatus(FileSystem fs, Path path, PathFilter 
filter) throws IOException {
+    try {
+      return fs.listStatus(path, filter);
+    } catch (FileNotFoundException e) {
+      return new FileStatus[0];
+    }
+  }
+
+  public static void cacheFiles(Path fileToCache, Configuration conf) {
+    DistributedCache.setCacheFiles(new URI[]{fileToCache.toUri()}, conf);
+  }
+
+  /**
+   * Return the first cached file in the list, else null if thre are no cached 
files.
+   * @param conf - MapReduce Configuration
+   * @return Path of Cached file
+   * @throws IOException - IO Exception
+   */
+  public static Path getSingleCachedFile(Configuration conf) throws 
IOException {
+    return getCachedFiles(conf)[0];
+  }
+
+  /**
+   * Retrieves paths to cached files.
+   * @param conf - MapReduce Configuration
+   * @return Path[] of Cached Files
+   * @throws IOException - IO Exception
+   * @throws IllegalStateException if no cache files are found
+   */
+  public static Path[] getCachedFiles(Configuration conf) throws IOException {
+    LocalFileSystem localFs = FileSystem.getLocal(conf);
+    Path[] cacheFiles = DistributedCache.getLocalCacheFiles(conf);
+
+    URI[] fallbackFiles = DistributedCache.getCacheFiles(conf);
+
+    // fallback for local execution
+    if (cacheFiles == null) {
+
+      Preconditions.checkState(fallbackFiles != null, "Unable to find cached 
files!");
+
+      cacheFiles = new Path[fallbackFiles.length];
+      for (int n = 0; n < fallbackFiles.length; n++) {
+        cacheFiles[n] = new Path(fallbackFiles[n].getPath());
+      }
+    } else {
+
+      for (int n = 0; n < cacheFiles.length; n++) {
+        cacheFiles[n] = localFs.makeQualified(cacheFiles[n]);
+        // fallback for local execution
+        if (!localFs.exists(cacheFiles[n])) {
+          cacheFiles[n] = new Path(fallbackFiles[n].getPath());
+        }
+      }
+    }
+
+    Preconditions.checkState(cacheFiles.length > 0, "Unable to find cached 
files!");
+
+    return cacheFiles;
+  }
+
+  public static void setSerializations(Configuration configuration) {
+    configuration.set("io.serializations", 
"org.apache.hadoop.io.serializer.JavaSerialization,"
+        + "org.apache.hadoop.io.serializer.WritableSerialization");
+  }
+
+  public static void writeInt(int value, Path path, Configuration 
configuration) throws IOException {
+    FileSystem fs = FileSystem.get(path.toUri(), configuration);
+    FSDataOutputStream out = fs.create(path);
+    try {
+      out.writeInt(value);
+    } finally {
+      Closeables.close(out, false);
+    }
+  }
+
+  public static int readInt(Path path, Configuration configuration) throws 
IOException {
+    FileSystem fs = FileSystem.get(path.toUri(), configuration);
+    FSDataInputStream in = fs.open(path);
+    try {
+      return in.readInt();
+    } finally {
+      Closeables.close(in, true);
+    }
+  }
+
+  /**
+   * Builds a comma-separated list of input splits
+   * @param fs - File System
+   * @param fileStatus - File Status
+   * @return list of directories as a comma-separated String
+   * @throws IOException - IO Exception
+   */
+  public static String buildDirList(FileSystem fs, FileStatus fileStatus) 
throws IOException {
+    boolean containsFiles = false;
+    List<String> directoriesList = Lists.newArrayList();
+    for (FileStatus childFileStatus : fs.listStatus(fileStatus.getPath())) {
+      if (childFileStatus.isDir()) {
+        String subDirectoryList = buildDirList(fs, childFileStatus);
+        directoriesList.add(subDirectoryList);
+      } else {
+        containsFiles = true;
+      }
+    }
+
+    if (containsFiles) {
+      directoriesList.add(fileStatus.getPath().toUri().getPath());
+    }
+    return Joiner.on(',').skipNulls().join(directoriesList.iterator());
+  }
+
+  /**
+   * Builds a comma-separated list of input splits
+   * @param fs - File System
+   * @param fileStatus - File Status
+   * @param pathFilter - path filter
+   * @return list of directories as a comma-separated String
+   * @throws IOException - IO Exception
+   */
+  public static String buildDirList(FileSystem fs, FileStatus fileStatus, 
PathFilter pathFilter) throws IOException {
+    boolean containsFiles = false;
+    List<String> directoriesList = Lists.newArrayList();
+    for (FileStatus childFileStatus : fs.listStatus(fileStatus.getPath(), 
pathFilter)) {
+      if (childFileStatus.isDir()) {
+        String subDirectoryList = buildDirList(fs, childFileStatus);
+        directoriesList.add(subDirectoryList);
+      } else {
+        containsFiles = true;
+      }
+    }
+
+    if (containsFiles) {
+      directoriesList.add(fileStatus.getPath().toUri().getPath());
+    }
+    return Joiner.on(',').skipNulls().join(directoriesList.iterator());
+  }
+
+  /**
+   *
+   * @param configuration  -  configuration
+   * @param filePath - Input File Path
+   * @return relative file Path
+   * @throws IOException - IO Exception
+   */
+  public static String calcRelativeFilePath(Configuration configuration, Path 
filePath) throws IOException {
+    FileSystem fs = filePath.getFileSystem(configuration);
+    FileStatus fst = fs.getFileStatus(filePath);
+    String currentPath = fst.getPath().toString().replaceFirst("file:", "");
+
+    String basePath = configuration.get("baseinputpath");
+    if (!basePath.endsWith("/")) {
+      basePath += "/";
+    }
+    basePath = basePath.replaceFirst("file:", "");
+    String[] parts = currentPath.split(basePath);
+
+    if (parts.length == 2) {
+      return parts[1];
+    } else if (parts.length == 1) {
+      return parts[0];
+    }
+    return currentPath;
+  }
+
+  /**
+   * Finds a file in the DistributedCache
+   *
+   * @param partOfFilename a substring of the file name
+   * @param localFiles holds references to files stored in distributed cache
+   * @return Path to first matched file or null if nothing was found
+   **/
+  public static Path findInCacheByPartOfFilename(String partOfFilename, URI[] 
localFiles) {
+    for (URI distCacheFile : localFiles) {
+      log.info("trying find a file in distributed cache containing [{}] in its 
name", partOfFilename);
+      if (distCacheFile != null && 
distCacheFile.toString().contains(partOfFilename)) {
+        log.info("found file [{}] containing [{}]", distCacheFile.toString(), 
partOfFilename);
+        return new Path(distCacheFile.getPath());
+      }
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/IntPairWritable.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/IntPairWritable.java 
b/mr/src/main/java/org/apache/mahout/common/IntPairWritable.java
new file mode 100644
index 0000000..dacd66f
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/IntPairWritable.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common;
+
+import org.apache.hadoop.io.BinaryComparable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+
+/**
+ * A {@link WritableComparable} which encapsulates an ordered pair of signed 
integers.
+ */
+public final class IntPairWritable extends BinaryComparable
+    implements WritableComparable<BinaryComparable>, Cloneable {
+
+  static final int INT_BYTE_LENGTH = 4;
+  static final int INT_PAIR_BYTE_LENGTH = 2 * INT_BYTE_LENGTH;
+  private byte[] b = new byte[INT_PAIR_BYTE_LENGTH];
+  
+  public IntPairWritable() {
+    setFirst(0);
+    setSecond(0);
+  }
+  
+  public IntPairWritable(IntPairWritable pair) {
+    b = Arrays.copyOf(pair.getBytes(), INT_PAIR_BYTE_LENGTH);
+  }
+  
+  public IntPairWritable(int x, int y) {
+    putInt(x, b, 0);
+    putInt(y, b, INT_BYTE_LENGTH);
+  }
+  
+  public void set(int x, int y) {
+    putInt(x, b, 0);
+    putInt(y, b, INT_BYTE_LENGTH);
+  }
+  
+  public void setFirst(int x) {
+    putInt(x, b, 0);
+  }
+  
+  public int getFirst() {
+    return getInt(b, 0);
+  }
+  
+  public void setSecond(int y) {
+    putInt(y, b, INT_BYTE_LENGTH);
+  }
+  
+  public int getSecond() {
+    return getInt(b, INT_BYTE_LENGTH);
+  }
+  
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    in.readFully(b);
+  }
+  
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.write(b);
+  }
+  
+  @Override
+  public int hashCode() {
+    return Arrays.hashCode(b);
+  }
+  
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (!super.equals(obj)) {
+      return false;
+    }
+    if (!(obj instanceof IntPairWritable)) {
+      return false;
+    }
+    IntPairWritable other = (IntPairWritable) obj;
+    return Arrays.equals(b, other.b);
+  }
+
+  @Override
+  public int compareTo(BinaryComparable other) {
+    return Comparator.doCompare(b, 0, ((IntPairWritable) other).b, 0);
+  }
+
+  @Override
+  public Object clone() {
+    return new IntPairWritable(this);
+  }
+  
+  @Override
+  public String toString() {
+    return "(" + getFirst() + ", " + getSecond() + ')';
+  }
+  
+  @Override
+  public byte[] getBytes() {
+    return b;
+  }
+  
+  @Override
+  public int getLength() {
+    return INT_PAIR_BYTE_LENGTH;
+  }
+  
+  private static void putInt(int value, byte[] b, int offset) {
+    for (int i = offset, j = 24; j >= 0; i++, j -= 8) {
+      b[i] = (byte) (value >> j);
+    }
+  }
+  
+  private static int getInt(byte[] b, int offset) {
+    int value = 0;
+    for (int i = offset, j = 24; j >= 0; i++, j -= 8) {
+      value |= (b[i] & 0xFF) << j;
+    }
+    return value;
+  }
+
+  static {
+    WritableComparator.define(IntPairWritable.class, new Comparator());
+  }
+
+  public static final class Comparator extends WritableComparator implements 
Serializable {
+    public Comparator() {
+      super(IntPairWritable.class);
+    }
+    
+    @Override
+    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+      return doCompare(b1, s1, b2, s2);
+    }
+
+    static int doCompare(byte[] b1, int s1, byte[] b2, int s2) {
+      int compare1 = compareInts(b1, s1, b2, s2);
+      if (compare1 != 0) {
+        return compare1;
+      }
+      return compareInts(b1, s1 + INT_BYTE_LENGTH, b2, s2 + INT_BYTE_LENGTH);
+    }
+
+    private static int compareInts(byte[] b1, int s1, byte[] b2, int s2) {
+      // Like WritableComparator.compareBytes(), but treats first byte as 
signed value
+      int end1 = s1 + INT_BYTE_LENGTH;
+      for (int i = s1, j = s2; i < end1; i++, j++) {
+        int a = b1[i];
+        int b = b2[j];
+        if (i > s1) {
+          a &= 0xff;
+          b &= 0xff;
+        }
+        if (a != b) {
+          return a - b;
+        }
+      }
+      return 0;
+    }
+  }
+  
+  /**
+   * Compare only the first part of the pair, so that reduce is called once 
for each value of the first part.
+   */
+  public static class FirstGroupingComparator extends WritableComparator 
implements Serializable {
+    
+    public FirstGroupingComparator() {
+      super(IntPairWritable.class);
+    }
+    
+    @Override
+    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+      int firstb1 = WritableComparator.readInt(b1, s1);
+      int firstb2 = WritableComparator.readInt(b2, s2);
+      if (firstb1 < firstb2) {
+        return -1;
+      } else if (firstb1 > firstb2) {
+        return 1;
+      } else {
+        return 0;
+      }
+    }
+    
+    @Override
+    public int compare(Object o1, Object o2) {
+      int firstb1 = ((IntPairWritable) o1).getFirst();
+      int firstb2 = ((IntPairWritable) o2).getFirst();
+      if (firstb1 < firstb2) {
+        return -1;
+      }
+      if (firstb1 > firstb2) {
+        return 1;
+      }
+      return 0;
+    }
+    
+  }
+  
+  /** A wrapper class that associates pairs with frequency (Occurrences) */
+  public static class Frequency implements Comparable<Frequency>, Serializable 
{
+    
+    private final IntPairWritable pair;
+    private final double frequency;
+
+    public Frequency(IntPairWritable bigram, double frequency) {
+      this.pair = new IntPairWritable(bigram);
+      this.frequency = frequency;
+    }
+
+    public double getFrequency() {
+      return frequency;
+    }
+
+    public IntPairWritable getPair() {
+      return pair;
+    }
+
+    @Override
+    public int hashCode() {
+      return pair.hashCode() + RandomUtils.hashDouble(frequency);
+    }
+    
+    @Override
+    public boolean equals(Object right) {
+      if (!(right instanceof Frequency)) {
+        return false;
+      }
+      Frequency that = (Frequency) right;
+      return pair.equals(that.pair) && frequency == that.frequency;
+    }
+    
+    @Override
+    public int compareTo(Frequency that) {
+      if (frequency < that.frequency) {
+        return -1;
+      }
+      if (frequency > that.frequency) {
+        return 1;
+      }
+      return 0;
+    }
+    
+    @Override
+    public String toString() {
+      return pair + "\t" + frequency;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/IntegerTuple.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/IntegerTuple.java 
b/mr/src/main/java/org/apache/mahout/common/IntegerTuple.java
new file mode 100644
index 0000000..f456d4d
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/IntegerTuple.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * An Ordered List of Integers which can be used in a Hadoop Map/Reduce Job
+ * 
+ * 
+ */
+public final class IntegerTuple implements WritableComparable<IntegerTuple> {
+  
+  private List<Integer> tuple = Lists.newArrayList();
+  
+  public IntegerTuple() { }
+  
+  public IntegerTuple(Integer firstEntry) {
+    add(firstEntry);
+  }
+  
+  public IntegerTuple(Iterable<Integer> entries) {
+    for (Integer entry : entries) {
+      add(entry);
+    }
+  }
+  
+  public IntegerTuple(Integer[] entries) {
+    for (Integer entry : entries) {
+      add(entry);
+    }
+  }
+  
+  /**
+   * add an entry to the end of the list
+   * 
+   * @param entry
+   * @return true if the items get added
+   */
+  public boolean add(Integer entry) {
+    return tuple.add(entry);
+  }
+  
+  /**
+   * Fetches the string at the given location
+   * 
+   * @param index
+   * @return String value at the given location in the tuple list
+   */
+  public Integer integerAt(int index) {
+    return tuple.get(index);
+  }
+  
+  /**
+   * Replaces the string at the given index with the given newString
+   * 
+   * @param index
+   * @param newInteger
+   * @return The previous value at that location
+   */
+  public Integer replaceAt(int index, Integer newInteger) {
+    return tuple.set(index, newInteger);
+  }
+  
+  /**
+   * Fetch the list of entries from the tuple
+   * 
+   * @return a List containing the strings in the order of insertion
+   */
+  public List<Integer> getEntries() {
+    return Collections.unmodifiableList(this.tuple);
+  }
+  
+  /**
+   * Returns the length of the tuple
+   * 
+   * @return length
+   */
+  public int length() {
+    return this.tuple.size();
+  }
+  
+  @Override
+  public String toString() {
+    return tuple.toString();
+  }
+  
+  @Override
+  public int hashCode() {
+    return tuple.hashCode();
+  }
+  
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+    IntegerTuple other = (IntegerTuple) obj;
+    if (tuple == null) {
+      if (other.tuple != null) {
+        return false;
+      }
+    } else if (!tuple.equals(other.tuple)) {
+      return false;
+    }
+    return true;
+  }
+  
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    int len = in.readInt();
+    tuple = Lists.newArrayListWithCapacity(len);
+    for (int i = 0; i < len; i++) {
+      int data = in.readInt();
+      tuple.add(data);
+    }
+  }
+  
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(tuple.size());
+    for (Integer entry : tuple) {
+      out.writeInt(entry);
+    }
+  }
+  
+  @Override
+  public int compareTo(IntegerTuple otherTuple) {
+    int thisLength = length();
+    int otherLength = otherTuple.length();
+    int min = Math.min(thisLength, otherLength);
+    for (int i = 0; i < min; i++) {
+      int ret = this.tuple.get(i).compareTo(otherTuple.integerAt(i));
+      if (ret == 0) {
+        continue;
+      }
+      return ret;
+    }
+    if (thisLength < otherLength) {
+      return -1;
+    } else if (thisLength > otherLength) {
+      return 1;
+    } else {
+      return 0;
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/LongPair.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/LongPair.java 
b/mr/src/main/java/org/apache/mahout/common/LongPair.java
new file mode 100644
index 0000000..5215e3a
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/LongPair.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common;
+
+import java.io.Serializable;
+
+import com.google.common.primitives.Longs;
+
+/** A simple (ordered) pair of longs. */
+public final class LongPair implements Comparable<LongPair>, Serializable {
+  
+  private final long first;
+  private final long second;
+  
+  public LongPair(long first, long second) {
+    this.first = first;
+    this.second = second;
+  }
+  
+  public long getFirst() {
+    return first;
+  }
+  
+  public long getSecond() {
+    return second;
+  }
+  
+  public LongPair swap() {
+    return new LongPair(second, first);
+  }
+  
+  @Override
+  public boolean equals(Object obj) {
+    if (!(obj instanceof LongPair)) {
+      return false;
+    }
+    LongPair otherPair = (LongPair) obj;
+    return first == otherPair.getFirst() && second == otherPair.getSecond();
+  }
+  
+  @Override
+  public int hashCode() {
+    int firstHash = Longs.hashCode(first);
+    // Flip top and bottom 16 bits; this makes the hash function probably 
different
+    // for (a,b) versus (b,a)
+    return (firstHash >>> 16 | firstHash << 16) ^ Longs.hashCode(second);
+  }
+  
+  @Override
+  public String toString() {
+    return '(' + String.valueOf(first) + ',' + second + ')';
+  }
+  
+  @Override
+  public int compareTo(LongPair o) {
+    if (first < o.getFirst()) {
+      return -1;
+    } else if (first > o.getFirst()) {
+      return 1;
+    } else {
+      return second < o.getSecond() ? -1 : second > o.getSecond() ? 1 : 0;
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/MemoryUtil.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/MemoryUtil.java 
b/mr/src/main/java/org/apache/mahout/common/MemoryUtil.java
new file mode 100644
index 0000000..f241b53
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/MemoryUtil.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.common;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Memory utilities.
+ */
+public final class MemoryUtil {
+
+  private static final Logger log = LoggerFactory.getLogger(MemoryUtil.class);
+
+  private MemoryUtil() {
+  }
+
+  /**
+   * Logs current heap memory statistics.
+   *
+   * @see Runtime
+   */
+  public static void logMemoryStatistics() {
+    Runtime runtime = Runtime.getRuntime();
+    long freeBytes = runtime.freeMemory();
+    long maxBytes = runtime.maxMemory();
+    long totalBytes = runtime.totalMemory();
+    long usedBytes = totalBytes - freeBytes;
+    log.info("Memory (bytes): {} used, {} heap, {} max", usedBytes, totalBytes,
+             maxBytes);
+  }
+
+  private static volatile ScheduledExecutorService scheduler;
+
+  /**
+   * Constructs and starts a memory logger thread.
+   *
+   * @param rateInMillis how often memory info should be logged.
+   */
+  public static void startMemoryLogger(long rateInMillis) {
+    stopMemoryLogger();
+    scheduler = Executors.newScheduledThreadPool(1, new ThreadFactory() {
+      private final ThreadFactory delegate = Executors.defaultThreadFactory();
+
+      @Override
+      public Thread newThread(Runnable r) {
+        Thread t = delegate.newThread(r);
+        t.setDaemon(true);
+        return t;
+      }
+    });
+    Runnable memoryLoogerRunnable = new Runnable() {
+      @Override
+      public void run() {
+        logMemoryStatistics();
+      }
+    };
+    scheduler.scheduleAtFixedRate(memoryLoogerRunnable, rateInMillis, 
rateInMillis,
+        TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * Constructs and starts a memory logger thread with a logging rate of 1000 
milliseconds.
+   */
+  public static void startMemoryLogger() {
+    startMemoryLogger(1000);
+  }
+
+  /**
+   * Stops the memory logger, if any, started via {@link 
#startMemoryLogger(long)} or
+   * {@link #startMemoryLogger()}.
+   */
+  public static void stopMemoryLogger() {
+    if (scheduler != null) {
+      scheduler.shutdownNow();
+      scheduler = null;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/Pair.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/Pair.java 
b/mr/src/main/java/org/apache/mahout/common/Pair.java
new file mode 100644
index 0000000..d2ad6a1
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/Pair.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common;
+
+import java.io.Serializable;
+
+/** A simple (ordered) pair of two objects. Elements may be null. */
+public final class Pair<A,B> implements Comparable<Pair<A,B>>, Serializable {
+  
+  private final A first;
+  private final B second;
+  
+  public Pair(A first, B second) {
+    this.first = first;
+    this.second = second;
+  }
+  
+  public A getFirst() {
+    return first;
+  }
+  
+  public B getSecond() {
+    return second;
+  }
+  
+  public Pair<B, A> swap() {
+    return new Pair<>(second, first);
+  }
+
+  public static <A,B> Pair<A,B> of(A a, B b) {
+    return new Pair<>(a, b);
+  }
+  
+  @Override
+  public boolean equals(Object obj) {
+    if (!(obj instanceof Pair<?, ?>)) {
+      return false;
+    }
+    Pair<?, ?> otherPair = (Pair<?, ?>) obj;
+    return isEqualOrNulls(first, otherPair.getFirst())
+        && isEqualOrNulls(second, otherPair.getSecond());
+  }
+  
+  private static boolean isEqualOrNulls(Object obj1, Object obj2) {
+    return obj1 == null ? obj2 == null : obj1.equals(obj2);
+  }
+  
+  @Override
+  public int hashCode() {
+    int firstHash = hashCodeNull(first);
+    // Flip top and bottom 16 bits; this makes the hash function probably 
different
+    // for (a,b) versus (b,a)
+    return (firstHash >>> 16 | firstHash << 16) ^ hashCodeNull(second);
+  }
+  
+  private static int hashCodeNull(Object obj) {
+    return obj == null ? 0 : obj.hashCode();
+  }
+  
+  @Override
+  public String toString() {
+    return '(' + String.valueOf(first) + ',' + second + ')';
+  }
+
+  /**
+   * Defines an ordering on pairs that sorts by first value's natural 
ordering, ascending,
+   * and then by second value's natural ordering.
+   *
+   * @throws ClassCastException if types are not actually {@link Comparable}
+   */
+  @Override
+  public int compareTo(Pair<A,B> other) {
+    Comparable<A> thisFirst = (Comparable<A>) first;
+    A thatFirst = other.getFirst();
+    int compare = thisFirst.compareTo(thatFirst);
+    if (compare != 0) {
+      return compare;
+    }
+    Comparable<B> thisSecond = (Comparable<B>) second;
+    B thatSecond = other.getSecond();
+    return thisSecond.compareTo(thatSecond);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/Parameters.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/Parameters.java 
b/mr/src/main/java/org/apache/mahout/common/Parameters.java
new file mode 100644
index 0000000..e74c534
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/Parameters.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common;
+
+import java.io.IOException;
+import java.util.Map;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DefaultStringifier;
+import org.apache.hadoop.util.GenericsUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Parameters {
+  
+  private static final Logger log = LoggerFactory.getLogger(Parameters.class);
+  
+  private Map<String,String> params = Maps.newHashMap();
+
+  public Parameters() {
+
+  }
+
+  public Parameters(String serializedString) throws IOException {
+    this(parseParams(serializedString));
+  }
+
+  protected Parameters(Map<String,String> params) {
+    this.params = params;
+  }
+
+  public String get(String key) {
+    return params.get(key);
+  }
+  
+  public String get(String key, String defaultValue) {
+    String ret = params.get(key);
+    return ret == null ? defaultValue : ret;
+  }
+  
+  public void set(String key, String value) {
+    params.put(key, value);
+  }
+
+  public int getInt(String key, int defaultValue) {
+    String ret = params.get(key);
+    return ret == null ? defaultValue : Integer.parseInt(ret);
+  }
+
+  @Override
+  public String toString() {
+    Configuration conf = new Configuration();
+    conf.set("io.serializations",
+             "org.apache.hadoop.io.serializer.JavaSerialization,"
+             + "org.apache.hadoop.io.serializer.WritableSerialization");
+    DefaultStringifier<Map<String,String>> mapStringifier = new 
DefaultStringifier<>(conf,
+        GenericsUtil.getClass(params));
+    try {
+      return mapStringifier.toString(params);
+    } catch (IOException e) {
+      log.info("Encountered IOException while deserializing returning empty 
string", e);
+      return "";
+    }
+    
+  }
+  
+  public String print() {
+    return params.toString();
+  }
+
+  public static Map<String,String> parseParams(String serializedString) throws 
IOException {
+    Configuration conf = new Configuration();
+    conf.set("io.serializations",
+             "org.apache.hadoop.io.serializer.JavaSerialization,"
+             + "org.apache.hadoop.io.serializer.WritableSerialization");
+    Map<String,String> params = Maps.newHashMap();
+    DefaultStringifier<Map<String,String>> mapStringifier = new 
DefaultStringifier<>(conf,
+        GenericsUtil.getClass(params));
+    return mapStringifier.fromString(serializedString);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/StringTuple.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/StringTuple.java 
b/mr/src/main/java/org/apache/mahout/common/StringTuple.java
new file mode 100644
index 0000000..0de1a4a
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/StringTuple.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * An Ordered List of Strings which can be used in a Hadoop Map/Reduce Job
+ */
+public final class StringTuple implements WritableComparable<StringTuple> {
+  
+  private List<String> tuple = Lists.newArrayList();
+  
+  public StringTuple() { }
+  
+  public StringTuple(String firstEntry) {
+    add(firstEntry);
+  }
+  
+  public StringTuple(Iterable<String> entries) {
+    for (String entry : entries) {
+      add(entry);
+    }
+  }
+  
+  public StringTuple(String[] entries) {
+    for (String entry : entries) {
+      add(entry);
+    }
+  }
+  
+  /**
+   * add an entry to the end of the list
+   * 
+   * @param entry
+   * @return true if the items get added
+   */
+  public boolean add(String entry) {
+    return tuple.add(entry);
+  }
+  
+  /**
+   * Fetches the string at the given location
+   * 
+   * @param index
+   * @return String value at the given location in the tuple list
+   */
+  public String stringAt(int index) {
+    return tuple.get(index);
+  }
+  
+  /**
+   * Replaces the string at the given index with the given newString
+   * 
+   * @param index
+   * @param newString
+   * @return The previous value at that location
+   */
+  public String replaceAt(int index, String newString) {
+    return tuple.set(index, newString);
+  }
+  
+  /**
+   * Fetch the list of entries from the tuple
+   * 
+   * @return a List containing the strings in the order of insertion
+   */
+  public List<String> getEntries() {
+    return Collections.unmodifiableList(this.tuple);
+  }
+  
+  /**
+   * Returns the length of the tuple
+   * 
+   * @return length
+   */
+  public int length() {
+    return this.tuple.size();
+  }
+  
+  @Override
+  public String toString() {
+    return tuple.toString();
+  }
+  
+  @Override
+  public int hashCode() {
+    return tuple.hashCode();
+  }
+  
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+    StringTuple other = (StringTuple) obj;
+    if (tuple == null) {
+      if (other.tuple != null) {
+        return false;
+      }
+    } else if (!tuple.equals(other.tuple)) {
+      return false;
+    }
+    return true;
+  }
+  
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    int len = in.readInt();
+    tuple = Lists.newArrayListWithCapacity(len);
+    Text value = new Text();
+    for (int i = 0; i < len; i++) {
+      value.readFields(in);
+      tuple.add(value.toString());
+    }
+  }
+  
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(tuple.size());
+    Text value = new Text();
+    for (String entry : tuple) {
+      value.set(entry);
+      value.write(out);
+    }
+  }
+  
+  @Override
+  public int compareTo(StringTuple otherTuple) {
+    int thisLength = length();
+    int otherLength = otherTuple.length();
+    int min = Math.min(thisLength, otherLength);
+    for (int i = 0; i < min; i++) {
+      int ret = this.tuple.get(i).compareTo(otherTuple.stringAt(i));
+      if (ret != 0) {
+        return ret;
+      }
+    }
+    if (thisLength < otherLength) {
+      return -1;
+    } else if (thisLength > otherLength) {
+      return 1;
+    } else {
+      return 0;
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/StringUtils.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/StringUtils.java 
b/mr/src/main/java/org/apache/mahout/common/StringUtils.java
new file mode 100644
index 0000000..a064596
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/StringUtils.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common;
+
+import java.util.regex.Pattern;
+
+import com.thoughtworks.xstream.XStream;
+
+/**
+ * Offers two methods to convert an object to a string representation and 
restore the object given its string
+ * representation. Should use Hadoop Stringifier whenever available.
+ */
+public final class StringUtils {
+  
+  private static final XStream XSTREAM = new XStream();
+  private static final Pattern NEWLINE_PATTERN = Pattern.compile("\n");
+  private static final Pattern XMLRESERVED = 
Pattern.compile("\"|\\&|\\<|\\>|\'");
+
+  private StringUtils() {
+  // do nothing
+  }
+  
+  /**
+   * Converts the object to a one-line string representation
+   * 
+   * @param obj
+   *          the object to convert
+   * @return the string representation of the object
+   */
+  public static String toString(Object obj) {
+    return NEWLINE_PATTERN.matcher(XSTREAM.toXML(obj)).replaceAll("");
+  }
+  
+  /**
+   * Restores the object from its string representation.
+   * 
+   * @param str
+   *          the string representation of the object
+   * @return restored object
+   */
+  public static <T> T fromString(String str) {
+    return (T) XSTREAM.fromXML(str);
+  }
+
+  public static String escapeXML(CharSequence input) {
+    return XMLRESERVED.matcher(input).replaceAll("_");
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/TimingStatistics.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/common/TimingStatistics.java 
b/mr/src/main/java/org/apache/mahout/common/TimingStatistics.java
new file mode 100644
index 0000000..5ee2066
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/TimingStatistics.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common;
+
+import java.io.Serializable;
+import java.text.DecimalFormat;
+
+public final class TimingStatistics implements Serializable {
+  private static final DecimalFormat DF = new DecimalFormat("#.##");
+  private int nCalls;
+  private long minTime;
+  private long maxTime;
+  private long sumTime;
+  private long leadSumTime;
+  private double sumSquaredTime;
+
+
+  /** Creates a new instance of CallStats */
+  public TimingStatistics() { }
+
+  public TimingStatistics(int nCalls, long minTime, long maxTime, long 
sumTime, double sumSquaredTime) {
+    this.nCalls = nCalls;
+    this.minTime = minTime;
+    this.maxTime = maxTime;
+    this.sumTime = sumTime;
+    this.sumSquaredTime = sumSquaredTime;
+  }
+
+  public synchronized int getNCalls() {
+    return nCalls;
+  }
+
+  public synchronized long getMinTime() {
+    return Math.max(0, minTime);
+  }
+
+  public synchronized long getMaxTime() {
+    return maxTime;
+  }
+
+  public synchronized long getSumTime() {
+    return sumTime;
+  }
+
+  public synchronized double getSumSquaredTime() {
+    return sumSquaredTime;
+  }
+
+  public synchronized long getMeanTime() {
+    return nCalls == 0 ? 0 : sumTime / nCalls;
+  }
+
+  public synchronized long getStdDevTime() {
+    if (nCalls == 0) {
+      return 0;
+    }
+    double mean = getMeanTime();
+    double meanSquared = mean * mean;
+    double meanOfSquares = sumSquaredTime / nCalls;
+    double variance = meanOfSquares - meanSquared;
+    if (variance < 0) {
+      return 0; // might happen due to rounding error
+    }
+    return (long) Math.sqrt(variance);
+  }
+
+  @Override
+  public synchronized String toString() {
+    return '\n'
+        + "nCalls = " + nCalls + ";\n"
+        + "sum    = " + DF.format(sumTime / 1000000000.0) + "s;\n"
+        + "min    = " + DF.format(minTime / 1000000.0) + "ms;\n"
+        + "max    = " + DF.format(maxTime / 1000000.0) + "ms;\n"
+        + "mean   = " + DF.format(getMeanTime() / 1000.0) + "us;\n"
+        + "stdDev = " + DF.format(getStdDevTime() / 1000.0) + "us;";
+  }
+
+  /** Ignores counting the performance metrics until leadTimeIsFinished The 
caller should enough time for the JIT
+   *  to warm up. */
+  public Call newCall(long leadTimeUsec) {
+    if (leadSumTime > leadTimeUsec) {
+      return new Call();
+    } else {
+      return new LeadTimeCall();
+    }
+  }
+
+  /** Ignores counting the performance metrics. The caller should enough time 
for the JIT to warm up. */
+  public final class LeadTimeCall extends Call {
+
+    private LeadTimeCall() { }
+
+    @Override
+    public void end() {
+      long elapsed = System.nanoTime() - startTime;
+      synchronized (TimingStatistics.this) {
+        leadSumTime += elapsed;
+      }
+    }
+
+    @Override
+    public boolean end(long sumMaxUsec) {
+      end();
+      return false;
+    }
+  }
+
+  /**
+   * A call object that can update performance metrics.
+   */
+  public class Call {
+    protected final long startTime = System.nanoTime();
+
+    private Call() { }
+
+    public void end() {
+      long elapsed = System.nanoTime() - startTime;
+      synchronized (TimingStatistics.this) {
+        nCalls++;
+        if (elapsed < minTime || nCalls == 1) {
+          minTime = elapsed;
+        }
+        if (elapsed > maxTime) {
+          maxTime = elapsed;
+        }
+        sumTime += elapsed;
+        sumSquaredTime += elapsed * elapsed;
+      }
+    }
+
+    /**
+     * Returns true if the sumTime as reached this limit;
+     */
+    public boolean end(long sumMaxUsec) {
+      end();
+      return sumMaxUsec < sumTime;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/commandline/DefaultOptionCreator.java
----------------------------------------------------------------------
diff --git 
a/mr/src/main/java/org/apache/mahout/common/commandline/DefaultOptionCreator.java
 
b/mr/src/main/java/org/apache/mahout/common/commandline/DefaultOptionCreator.java
new file mode 100644
index 0000000..0e7ee96
--- /dev/null
+++ 
b/mr/src/main/java/org/apache/mahout/common/commandline/DefaultOptionCreator.java
@@ -0,0 +1,417 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.commandline;
+
+import org.apache.commons.cli2.Option;
+import org.apache.commons.cli2.builder.ArgumentBuilder;
+import org.apache.commons.cli2.builder.DefaultOptionBuilder;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.mahout.common.distance.SquaredEuclideanDistanceMeasure;
+import org.apache.mahout.clustering.kernel.TriangularKernelProfile;
+
+
+public final class DefaultOptionCreator {
+  
+  public static final String CLUSTERING_OPTION = "clustering";
+  
+  public static final String CLUSTERS_IN_OPTION = "clusters";
+  
+  public static final String CONVERGENCE_DELTA_OPTION = "convergenceDelta";
+  
+  public static final String DISTANCE_MEASURE_OPTION = "distanceMeasure";
+  
+  public static final String EMIT_MOST_LIKELY_OPTION = "emitMostLikely";
+  
+  public static final String INPUT_OPTION = "input";
+  
+  public static final String MAX_ITERATIONS_OPTION = "maxIter";
+  
+  public static final String MAX_REDUCERS_OPTION = "maxRed";
+  
+  public static final String METHOD_OPTION = "method";
+  
+  public static final String NUM_CLUSTERS_OPTION = "numClusters";
+  
+  public static final String OUTPUT_OPTION = "output";
+  
+  public static final String OVERWRITE_OPTION = "overwrite";
+  
+  public static final String T1_OPTION = "t1";
+  
+  public static final String T2_OPTION = "t2";
+  
+  public static final String T3_OPTION = "t3";
+  
+  public static final String T4_OPTION = "t4";
+  
+  public static final String OUTLIER_THRESHOLD = "outlierThreshold";
+  
+  public static final String CLUSTER_FILTER_OPTION = "clusterFilter";
+  
+  public static final String THRESHOLD_OPTION = "threshold";
+  
+  public static final String SEQUENTIAL_METHOD = "sequential";
+  
+  public static final String MAPREDUCE_METHOD = "mapreduce";
+  
+  public static final String KERNEL_PROFILE_OPTION = "kernelProfile";
+
+  public static final String ANALYZER_NAME_OPTION = "analyzerName";
+ 
+  public static final String RANDOM_SEED = "randomSeed";
+  
+  private DefaultOptionCreator() {}
+  
+  /**
+   * Returns a default command line option for help. Used by all clustering 
jobs
+   * and many others
+   * */
+  public static Option helpOption() {
+    return new DefaultOptionBuilder().withLongName("help")
+        .withDescription("Print out help").withShortName("h").create();
+  }
+  
+  /**
+   * Returns a default command line option for input directory specification.
+   * Used by all clustering jobs plus others
+   */
+  public static DefaultOptionBuilder inputOption() {
+    return new DefaultOptionBuilder()
+        .withLongName(INPUT_OPTION)
+        .withRequired(false)
+        .withShortName("i")
+        .withArgument(
+            new ArgumentBuilder().withName(INPUT_OPTION).withMinimum(1)
+                .withMaximum(1).create())
+        .withDescription("Path to job input directory.");
+  }
+  
+  /**
+   * Returns a default command line option for clusters input directory
+   * specification. Used by FuzzyKmeans, Kmeans
+   */
+  public static DefaultOptionBuilder clustersInOption() {
+    return new DefaultOptionBuilder()
+        .withLongName(CLUSTERS_IN_OPTION)
+        .withRequired(true)
+        .withArgument(
+            new ArgumentBuilder().withName(CLUSTERS_IN_OPTION).withMinimum(1)
+                .withMaximum(1).create())
+        .withDescription(
+            "The path to the initial clusters directory. Must be a 
SequenceFile of some type of Cluster")
+        .withShortName("c");
+  }
+  
+  /**
+   * Returns a default command line option for output directory specification.
+   * Used by all clustering jobs plus others
+   */
+  public static DefaultOptionBuilder outputOption() {
+    return new DefaultOptionBuilder()
+        .withLongName(OUTPUT_OPTION)
+        .withRequired(false)
+        .withShortName("o")
+        .withArgument(
+            new ArgumentBuilder().withName(OUTPUT_OPTION).withMinimum(1)
+                .withMaximum(1).create())
+        .withDescription("The directory pathname for output.");
+  }
+  
+  /**
+   * Returns a default command line option for output directory overwriting.
+   * Used by all clustering jobs
+   */
+  public static DefaultOptionBuilder overwriteOption() {
+    return new DefaultOptionBuilder()
+        .withLongName(OVERWRITE_OPTION)
+        .withRequired(false)
+        .withDescription(
+            "If present, overwrite the output directory before running job")
+        .withShortName("ow");
+  }
+  
+  /**
+   * Returns a default command line option for specification of distance 
measure
+   * class to use. Used by Canopy, FuzzyKmeans, Kmeans, MeanShift
+   */
+  public static DefaultOptionBuilder distanceMeasureOption() {
+    return new DefaultOptionBuilder()
+        .withLongName(DISTANCE_MEASURE_OPTION)
+        .withRequired(false)
+        .withShortName("dm")
+        .withArgument(
+            new ArgumentBuilder().withName(DISTANCE_MEASURE_OPTION)
+                .withDefault(SquaredEuclideanDistanceMeasure.class.getName())
+                .withMinimum(1).withMaximum(1).create())
+        .withDescription(
+            "The classname of the DistanceMeasure. Default is 
SquaredEuclidean");
+  }
+  
+  /**
+   * Returns a default command line option for specification of sequential or
+   * parallel operation. Used by Canopy, FuzzyKmeans, Kmeans, MeanShift,
+   * Dirichlet
+   */
+  public static DefaultOptionBuilder methodOption() {
+    return new DefaultOptionBuilder()
+        .withLongName(METHOD_OPTION)
+        .withRequired(false)
+        .withShortName("xm")
+        .withArgument(
+            new ArgumentBuilder().withName(METHOD_OPTION)
+                .withDefault(MAPREDUCE_METHOD).withMinimum(1).withMaximum(1)
+                .create())
+        .withDescription(
+            "The execution method to use: sequential or mapreduce. Default is 
mapreduce");
+  }
+  
+  /**
+   * Returns a default command line option for specification of T1. Used by
+   * Canopy, MeanShift
+   */
+  public static DefaultOptionBuilder t1Option() {
+    return new DefaultOptionBuilder()
+        .withLongName(T1_OPTION)
+        .withRequired(true)
+        .withArgument(
+            new ArgumentBuilder().withName(T1_OPTION).withMinimum(1)
+                .withMaximum(1).create()).withDescription("T1 threshold value")
+        .withShortName(T1_OPTION);
+  }
+  
+  /**
+   * Returns a default command line option for specification of T2. Used by
+   * Canopy, MeanShift
+   */
+  public static DefaultOptionBuilder t2Option() {
+    return new DefaultOptionBuilder()
+        .withLongName(T2_OPTION)
+        .withRequired(true)
+        .withArgument(
+            new ArgumentBuilder().withName(T2_OPTION).withMinimum(1)
+                .withMaximum(1).create()).withDescription("T2 threshold value")
+        .withShortName(T2_OPTION);
+  }
+  
+  /**
+   * Returns a default command line option for specification of T3 (Reducer 
T1).
+   * Used by Canopy
+   */
+  public static DefaultOptionBuilder t3Option() {
+    return new DefaultOptionBuilder()
+        .withLongName(T3_OPTION)
+        .withRequired(false)
+        .withArgument(
+            new ArgumentBuilder().withName(T3_OPTION).withMinimum(1)
+                .withMaximum(1).create())
+        .withDescription("T3 (Reducer T1) threshold value")
+        .withShortName(T3_OPTION);
+  }
+  
+  /**
+   * Returns a default command line option for specification of T4 (Reducer 
T2).
+   * Used by Canopy
+   */
+  public static DefaultOptionBuilder t4Option() {
+    return new DefaultOptionBuilder()
+        .withLongName(T4_OPTION)
+        .withRequired(false)
+        .withArgument(
+            new ArgumentBuilder().withName(T4_OPTION).withMinimum(1)
+                .withMaximum(1).create())
+        .withDescription("T4 (Reducer T2) threshold value")
+        .withShortName(T4_OPTION);
+  }
+  
+  /**
+ * @return a DefaultOptionBuilder for the clusterFilter option
+ */
+  public static DefaultOptionBuilder clusterFilterOption() {
+    return new DefaultOptionBuilder()
+        .withLongName(CLUSTER_FILTER_OPTION)
+        .withShortName("cf")
+        .withRequired(false)
+        .withArgument(
+            new 
ArgumentBuilder().withName(CLUSTER_FILTER_OPTION).withMinimum(1)
+                .withMaximum(1).create())
+        .withDescription("Cluster filter suppresses small canopies from 
mapper")
+        .withShortName(CLUSTER_FILTER_OPTION);
+  }
+  
+  /**
+   * Returns a default command line option for specification of max number of
+   * iterations. Used by Dirichlet, FuzzyKmeans, Kmeans, LDA
+   */
+  public static DefaultOptionBuilder maxIterationsOption() {
+    // default value used by LDA which overrides withRequired(false)
+    return new DefaultOptionBuilder()
+        .withLongName(MAX_ITERATIONS_OPTION)
+        .withRequired(true)
+        .withShortName("x")
+        .withArgument(
+            new ArgumentBuilder().withName(MAX_ITERATIONS_OPTION)
+                .withDefault("-1").withMinimum(1).withMaximum(1).create())
+        .withDescription("The maximum number of iterations.");
+  }
+  
+  /**
+   * Returns a default command line option for specification of numbers of
+   * clusters to create. Used by Dirichlet, FuzzyKmeans, Kmeans
+   */
+  public static DefaultOptionBuilder numClustersOption() {
+    return new DefaultOptionBuilder()
+        .withLongName(NUM_CLUSTERS_OPTION)
+        .withRequired(false)
+        .withArgument(
+            new ArgumentBuilder().withName("k").withMinimum(1).withMaximum(1)
+                .create()).withDescription("The number of clusters to create")
+        .withShortName("k");
+  }
+
+  public static DefaultOptionBuilder useSetRandomSeedOption() {
+    return new DefaultOptionBuilder()
+        .withLongName(RANDOM_SEED)
+        .withRequired(false)
+        .withArgument(new ArgumentBuilder().withName(RANDOM_SEED).create())
+        .withDescription("Seed to initaize Random Number Generator with")
+        .withShortName("rs");
+  }
+  
+  /**
+   * Returns a default command line option for convergence delta specification.
+   * Used by FuzzyKmeans, Kmeans, MeanShift
+   */
+  public static DefaultOptionBuilder convergenceOption() {
+    return new DefaultOptionBuilder()
+        .withLongName(CONVERGENCE_DELTA_OPTION)
+        .withRequired(false)
+        .withShortName("cd")
+        .withArgument(
+            new ArgumentBuilder().withName(CONVERGENCE_DELTA_OPTION)
+                .withDefault("0.5").withMinimum(1).withMaximum(1).create())
+        .withDescription("The convergence delta value. Default is 0.5");
+  }
+  
+  /**
+   * Returns a default command line option for specifying the max number of
+   * reducers. Used by Dirichlet, FuzzyKmeans, Kmeans and LDA
+   * 
+   * @deprecated
+   */
+  @Deprecated
+  public static DefaultOptionBuilder numReducersOption() {
+    return new DefaultOptionBuilder()
+        .withLongName(MAX_REDUCERS_OPTION)
+        .withRequired(false)
+        .withShortName("r")
+        .withArgument(
+            new ArgumentBuilder().withName(MAX_REDUCERS_OPTION)
+                .withDefault("2").withMinimum(1).withMaximum(1).create())
+        .withDescription("The number of reduce tasks. Defaults to 2");
+  }
+  
+  /**
+   * Returns a default command line option for clustering specification. Used 
by
+   * all clustering except LDA
+   */
+  public static DefaultOptionBuilder clusteringOption() {
+    return new DefaultOptionBuilder()
+        .withLongName(CLUSTERING_OPTION)
+        .withRequired(false)
+        .withDescription(
+            "If present, run clustering after the iterations have taken place")
+        .withShortName("cl");
+  }
+
+  /**
+   * Returns a default command line option for specifying a Lucene analyzer 
class
+   * @return {@link DefaultOptionBuilder}
+   */
+  public static DefaultOptionBuilder analyzerOption() {
+    return new DefaultOptionBuilder()
+        .withLongName(ANALYZER_NAME_OPTION)
+        .withRequired(false)
+        .withDescription("If present, the name of a Lucene analyzer class to 
use")
+        .withArgument(new 
ArgumentBuilder().withName(ANALYZER_NAME_OPTION).withDefault(StandardAnalyzer.class.getName())
+            .withMinimum(1).withMaximum(1).create())
+       .withShortName("an");
+  }
+
+  
+  /**
+   * Returns a default command line option for specifying the emitMostLikely
+   * flag. Used by Dirichlet and FuzzyKmeans
+   */
+  public static DefaultOptionBuilder emitMostLikelyOption() {
+    return new DefaultOptionBuilder()
+        .withLongName(EMIT_MOST_LIKELY_OPTION)
+        .withRequired(false)
+        .withShortName("e")
+        .withArgument(
+            new ArgumentBuilder().withName(EMIT_MOST_LIKELY_OPTION)
+                .withDefault("true").withMinimum(1).withMaximum(1).create())
+        .withDescription(
+            "True if clustering should emit the most likely point only, "
+                + "false for threshold clustering. Default is true");
+  }
+  
+  /**
+   * Returns a default command line option for specifying the clustering
+   * threshold value. Used by Dirichlet and FuzzyKmeans
+   */
+  public static DefaultOptionBuilder thresholdOption() {
+    return new DefaultOptionBuilder()
+        .withLongName(THRESHOLD_OPTION)
+        .withRequired(false)
+        .withShortName("t")
+        .withArgument(
+            new ArgumentBuilder().withName(THRESHOLD_OPTION).withDefault("0")
+                .withMinimum(1).withMaximum(1).create())
+        .withDescription(
+            "The pdf threshold used for cluster determination. Default is 0");
+  }
+  
+  public static DefaultOptionBuilder kernelProfileOption() {
+    return new DefaultOptionBuilder()
+        .withLongName(KERNEL_PROFILE_OPTION)
+        .withRequired(false)
+        .withShortName("kp")
+        .withArgument(
+            new ArgumentBuilder()
+                .withName(KERNEL_PROFILE_OPTION)
+                .withDefault(TriangularKernelProfile.class.getName())
+                .withMinimum(1).withMaximum(1).create())
+        .withDescription(
+            "The classname of the IKernelProfile. Default is 
TriangularKernelProfile");
+  }
+  
+  /**
+   * Returns a default command line option for specification of OUTLIER 
THRESHOLD value. Used for
+   * Cluster Classification.
+   */
+  public static DefaultOptionBuilder outlierThresholdOption() {
+    return new DefaultOptionBuilder()
+        .withLongName(OUTLIER_THRESHOLD)
+        .withRequired(false)
+        .withArgument(
+            new ArgumentBuilder().withName(OUTLIER_THRESHOLD).withMinimum(1)
+                .withMaximum(1).create()).withDescription("Outlier threshold 
value")
+        .withShortName(OUTLIER_THRESHOLD);
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/distance/ChebyshevDistanceMeasure.java
----------------------------------------------------------------------
diff --git 
a/mr/src/main/java/org/apache/mahout/common/distance/ChebyshevDistanceMeasure.java
 
b/mr/src/main/java/org/apache/mahout/common/distance/ChebyshevDistanceMeasure.java
new file mode 100644
index 0000000..61aa9a5
--- /dev/null
+++ 
b/mr/src/main/java/org/apache/mahout/common/distance/ChebyshevDistanceMeasure.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.distance;
+
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.mahout.common.parameters.Parameter;
+import org.apache.mahout.math.CardinalityException;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.function.Functions;
+
+/**
+ * This class implements a "Chebyshev distance" metric by finding the maximum 
difference
+ * between each coordinate. Also 'chessboard distance' due to the moves a king 
can make.
+ */
+public class ChebyshevDistanceMeasure implements DistanceMeasure {
+  
+  @Override
+  public void configure(Configuration job) {
+    // nothing to do
+  }
+  
+  @Override
+  public Collection<Parameter<?>> getParameters() {
+    return Collections.emptyList();
+  }
+  
+  @Override
+  public void createParameters(String prefix, Configuration jobConf) {
+    // nothing to do
+  }
+  
+  @Override
+  public double distance(Vector v1, Vector v2) {
+    if (v1.size() != v2.size()) {
+      throw new CardinalityException(v1.size(), v2.size());
+    }
+    return v1.aggregate(v2, Functions.MAX_ABS, Functions.MINUS);
+  }
+  
+  @Override
+  public double distance(double centroidLengthSquare, Vector centroid, Vector 
v) {
+    return distance(centroid, v); // TODO
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/distance/CosineDistanceMeasure.java
----------------------------------------------------------------------
diff --git 
a/mr/src/main/java/org/apache/mahout/common/distance/CosineDistanceMeasure.java 
b/mr/src/main/java/org/apache/mahout/common/distance/CosineDistanceMeasure.java
new file mode 100644
index 0000000..37265eb
--- /dev/null
+++ 
b/mr/src/main/java/org/apache/mahout/common/distance/CosineDistanceMeasure.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.distance;
+
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.mahout.common.parameters.Parameter;
+import org.apache.mahout.math.CardinalityException;
+import org.apache.mahout.math.Vector;
+
+/**
+ * This class implements a cosine distance metric by dividing the dot product 
of two vectors by the product of their
+ * lengths.  That gives the cosine of the angle between the two vectors.  To 
convert this to a usable distance,
+ * 1-cos(angle) is what is actually returned.
+ */
+public class CosineDistanceMeasure implements DistanceMeasure {
+  
+  @Override
+  public void configure(Configuration job) {
+    // nothing to do
+  }
+  
+  @Override
+  public Collection<Parameter<?>> getParameters() {
+    return Collections.emptyList();
+  }
+  
+  @Override
+  public void createParameters(String prefix, Configuration jobConf) {
+    // nothing to do
+  }
+  
+  public static double distance(double[] p1, double[] p2) {
+    double dotProduct = 0.0;
+    double lengthSquaredp1 = 0.0;
+    double lengthSquaredp2 = 0.0;
+    for (int i = 0; i < p1.length; i++) {
+      lengthSquaredp1 += p1[i] * p1[i];
+      lengthSquaredp2 += p2[i] * p2[i];
+      dotProduct += p1[i] * p2[i];
+    }
+    double denominator = Math.sqrt(lengthSquaredp1) * 
Math.sqrt(lengthSquaredp2);
+    
+    // correct for floating-point rounding errors
+    if (denominator < dotProduct) {
+      denominator = dotProduct;
+    }
+    
+    // correct for zero-vector corner case
+    if (denominator == 0 && dotProduct == 0) {
+      return 0;
+    }
+    
+    return 1.0 - dotProduct / denominator;
+  }
+  
+  @Override
+  public double distance(Vector v1, Vector v2) {
+    if (v1.size() != v2.size()) {
+      throw new CardinalityException(v1.size(), v2.size());
+    }
+    double lengthSquaredv1 = v1.getLengthSquared();
+    double lengthSquaredv2 = v2.getLengthSquared();
+    
+    double dotProduct = v2.dot(v1);
+    double denominator = Math.sqrt(lengthSquaredv1) * 
Math.sqrt(lengthSquaredv2);
+    
+    // correct for floating-point rounding errors
+    if (denominator < dotProduct) {
+      denominator = dotProduct;
+    }
+    
+    // correct for zero-vector corner case
+    if (denominator == 0 && dotProduct == 0) {
+      return 0;
+    }
+    
+    return 1.0 - dotProduct / denominator;
+  }
+  
+  @Override
+  public double distance(double centroidLengthSquare, Vector centroid, Vector 
v) {
+    
+    double lengthSquaredv = v.getLengthSquared();
+    
+    double dotProduct = v.dot(centroid);
+    double denominator = Math.sqrt(centroidLengthSquare) * 
Math.sqrt(lengthSquaredv);
+    
+    // correct for floating-point rounding errors
+    if (denominator < dotProduct) {
+      denominator = dotProduct;
+    }
+    
+    // correct for zero-vector corner case
+    if (denominator == 0 && dotProduct == 0) {
+      return 0;
+    }
+    
+    return 1.0 - dotProduct / denominator;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/distance/DistanceMeasure.java
----------------------------------------------------------------------
diff --git 
a/mr/src/main/java/org/apache/mahout/common/distance/DistanceMeasure.java 
b/mr/src/main/java/org/apache/mahout/common/distance/DistanceMeasure.java
new file mode 100644
index 0000000..696e79c
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/common/distance/DistanceMeasure.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.distance;
+
+import org.apache.mahout.common.parameters.Parametered;
+import org.apache.mahout.math.Vector;
+
+/** This interface is used for objects which can determine a distance metric 
between two points */
+public interface DistanceMeasure extends Parametered {
+  
+  /**
+   * Returns the distance metric applied to the arguments
+   * 
+   * @param v1
+   *          a Vector defining a multidimensional point in some feature space
+   * @param v2
+   *          a Vector defining a multidimensional point in some feature space
+   * @return a scalar doubles of the distance
+   */
+  double distance(Vector v1, Vector v2);
+  
+  /**
+   * Optimized version of distance metric for sparse vectors. This distance 
computation requires operations
+   * proportional to the number of non-zero elements in the vector instead of 
the cardinality of the vector.
+   * 
+   * @param centroidLengthSquare
+   *          Square of the length of centroid
+   * @param centroid
+   *          Centroid vector
+   */
+  double distance(double centroidLengthSquare, Vector centroid, Vector v);
+  
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/distance/EuclideanDistanceMeasure.java
----------------------------------------------------------------------
diff --git 
a/mr/src/main/java/org/apache/mahout/common/distance/EuclideanDistanceMeasure.java
 
b/mr/src/main/java/org/apache/mahout/common/distance/EuclideanDistanceMeasure.java
new file mode 100644
index 0000000..665678d
--- /dev/null
+++ 
b/mr/src/main/java/org/apache/mahout/common/distance/EuclideanDistanceMeasure.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.distance;
+
+import org.apache.mahout.math.Vector;
+
+/**
+ * This class implements a Euclidean distance metric by summing the square 
root of the squared differences
+ * between each coordinate.
+ * <p/>
+ * If you don't care about the true distance and only need the values for 
comparison, then the base class,
+ * {@link SquaredEuclideanDistanceMeasure}, will be faster since it doesn't do 
the actual square root of the
+ * squared differences.
+ */
+public class EuclideanDistanceMeasure extends SquaredEuclideanDistanceMeasure {
+  
+  @Override
+  public double distance(Vector v1, Vector v2) {
+    return Math.sqrt(super.distance(v1, v2));
+  }
+  
+  @Override
+  public double distance(double centroidLengthSquare, Vector centroid, Vector 
v) {
+    return Math.sqrt(super.distance(centroidLengthSquare, centroid, v));
+  }
+}

Reply via email to