http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/AbstractJob.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/src/main/java/org/apache/mahout/common/AbstractJob.java 
b/community/mahout-mr/src/main/java/org/apache/mahout/common/AbstractJob.java
new file mode 100644
index 0000000..8072466
--- /dev/null
+++ 
b/community/mahout-mr/src/main/java/org/apache/mahout/common/AbstractJob.java
@@ -0,0 +1,648 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.cli2.CommandLine;
+import org.apache.commons.cli2.Group;
+import org.apache.commons.cli2.Option;
+import org.apache.commons.cli2.OptionException;
+import org.apache.commons.cli2.builder.ArgumentBuilder;
+import org.apache.commons.cli2.builder.DefaultOptionBuilder;
+import org.apache.commons.cli2.builder.GroupBuilder;
+import org.apache.commons.cli2.commandline.Parser;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.lucene.AnalyzerUtils;
+import org.apache.mahout.math.VectorWritable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>Superclass of many Mahout Hadoop "jobs". A job drives configuration and 
launch of one or
+ * more maps and reduces in order to accomplish some task.</p>
+ *
+ * <p>Command line arguments available to all subclasses are:</p>
+ *
+ * <ul>
+ *  <li>--tempDir (path): Specifies a directory where the job may place temp 
files
+ *   (default "temp")</li>
+ *  <li>--help: Show help message</li>
+ * </ul>
+ *
+ * <p>In addition, note some key command line parameters that are parsed by 
Hadoop, which jobs
+ * may need to set:</p>
+ *
+ * <ul>
+ *  <li>-Dmapred.job.name=(name): Sets the Hadoop task names. It will be 
suffixed by
+ *    the mapper and reducer class names</li>
+ *  <li>-Dmapred.output.compress={true,false}: Compress final output (default 
true)</li>
+ *  <li>-Dmapred.input.dir=(path): input file, or directory containing input 
files (required)</li>
+ *  <li>-Dmapred.output.dir=(path): path to write output files (required)</li>
+ * </ul>
+ *
+ * <p>Note that because of how Hadoop parses arguments, all "-D" arguments 
must appear before all other
+ * arguments.</p>
+ */
+public abstract class AbstractJob extends Configured implements Tool {
+
+  private static final Logger log = LoggerFactory.getLogger(AbstractJob.class);
+
+  /** option used to specify the input path */
+  private Option inputOption;
+
+  /** option used to specify the output path */
+  private Option outputOption;
+
+  /** input path, populated by {@link #parseArguments(String[])} */
+  protected Path inputPath;
+  protected File inputFile; //the input represented as a file
+
+  /** output path, populated by {@link #parseArguments(String[])} */
+  protected Path outputPath;
+  protected File outputFile; //the output represented as a file
+
+  /** temp path, populated by {@link #parseArguments(String[])} */
+  protected Path tempPath;
+
+  protected Map<String, List<String>> argMap;
+
+  /** internal list of options that have been added */
+  private final List<Option> options;
+  private Group group;
+
+  protected AbstractJob() {
+    options = new LinkedList<>();
+  }
+
+  /** Returns the input path established by a call to {@link 
#parseArguments(String[])}.
+   *  The source of the path may be an input option added using {@link 
#addInputOption()}
+   *  or it may be the value of the {@code mapred.input.dir} configuration
+   *  property. 
+   */
+  protected Path getInputPath() {
+    return inputPath;
+  }
+
+  /** Returns the output path established by a call to {@link 
#parseArguments(String[])}.
+   *  The source of the path may be an output option added using {@link 
#addOutputOption()}
+   *  or it may be the value of the {@code mapred.input.dir} configuration
+   *  property. 
+   */
+  protected Path getOutputPath() {
+    return outputPath;
+  }
+
+  protected Path getOutputPath(String path) {
+    return new Path(outputPath, path);
+  }
+
+  protected File getInputFile() {
+    return inputFile;
+  }
+
+  protected File getOutputFile() {
+    return outputFile;
+  }
+
+
+  protected Path getTempPath() {
+    return tempPath;
+  }
+
+  protected Path getTempPath(String directory) {
+    return new Path(tempPath, directory);
+  }
+  
+  @Override
+  public Configuration getConf() {
+    Configuration result = super.getConf();
+    if (result == null) {
+      return new Configuration();
+    }
+    return result;
+  }
+
+  /** Add an option with no argument whose presence can be checked for using
+   *  {@code containsKey} method on the map returned by {@link 
#parseArguments(String[])};
+   */
+  protected void addFlag(String name, String shortName, String description) {
+    options.add(buildOption(name, shortName, description, false, false, null));
+  }
+
+  /** Add an option to the the set of options this job will parse when
+   *  {@link #parseArguments(String[])} is called. This options has an argument
+   *  with null as its default value.
+   */
+  protected void addOption(String name, String shortName, String description) {
+    options.add(buildOption(name, shortName, description, true, false, null));
+  }
+
+  /** Add an option to the the set of options this job will parse when
+   *  {@link #parseArguments(String[])} is called.
+   *
+   * @param required if true the {@link #parseArguments(String[])} will throw
+   *    fail with an error and usage message if this option is not specified
+   *    on the command line.
+   */
+  protected void addOption(String name, String shortName, String description, 
boolean required) {
+    options.add(buildOption(name, shortName, description, true, required, 
null));
+  }
+
+  /** Add an option to the the set of options this job will parse when
+   *  {@link #parseArguments(String[])} is called. If this option is not 
+   *  specified on the command line the default value will be 
+   *  used.
+   *
+   * @param defaultValue the default argument value if this argument is not
+   *   found on the command-line. null is allowed.
+   */
+  protected void addOption(String name, String shortName, String description, 
String defaultValue) {
+    options.add(buildOption(name, shortName, description, true, false, 
defaultValue));
+  }
+
+  /** Add an arbitrary option to the set of options this job will parse when
+   *  {@link #parseArguments(String[])} is called. If this option has no
+   *  argument, use {@code containsKey} on the map returned by
+   *  {@code parseArguments} to check for its presence. Otherwise, the
+   *  string value of the option will be placed in the map using a key
+   *  equal to this options long name preceded by '--'.
+   * @return the option added.
+   */
+  protected Option addOption(Option option) {
+    options.add(option);
+    return option;
+  }
+
+  protected Group getGroup() {
+    return group;
+  }
+
+  /** Add the default input directory option, '-i' which takes a directory
+   *  name as an argument. When {@link #parseArguments(String[])} is 
+   *  called, the inputPath will be set based upon the value for this option.
+   *  If this method is called, the input is required.
+   */
+  protected void addInputOption() {
+    this.inputOption = addOption(DefaultOptionCreator.inputOption().create());
+  }
+
+  /** Add the default output directory option, '-o' which takes a directory
+   *  name as an argument. When {@link #parseArguments(String[])} is 
+   *  called, the outputPath will be set based upon the value for this option.
+   *  If this method is called, the output is required. 
+   */
+  protected void addOutputOption() {
+    this.outputOption = 
addOption(DefaultOptionCreator.outputOption().create());
+  }
+
+  /** Build an option with the given parameters. Name and description are
+   *  required.
+   * 
+   * @param name the long name of the option prefixed with '--' on the 
command-line
+   * @param shortName the short name of the option, prefixed with '-' on the 
command-line
+   * @param description description of the option displayed in help method
+   * @param hasArg true if the option has an argument.
+   * @param required true if the option is required.
+   * @param defaultValue default argument value, can be null.
+   * @return the option.
+   */
+  protected static Option buildOption(String name,
+                                      String shortName,
+                                      String description,
+                                      boolean hasArg,
+                                      boolean required,
+                                      String defaultValue) {
+
+    return buildOption(name, shortName, description, hasArg, 1, 1, required, 
defaultValue);
+  }
+
+  protected static Option buildOption(String name,
+                                      String shortName,
+                                      String description,
+                                      boolean hasArg, int min, int max,
+                                      boolean required,
+                                      String defaultValue) {
+
+    DefaultOptionBuilder optBuilder = new 
DefaultOptionBuilder().withLongName(name).withDescription(description)
+        .withRequired(required);
+
+    if (shortName != null) {
+      optBuilder.withShortName(shortName);
+    }
+
+    if (hasArg) {
+      ArgumentBuilder argBuilder = new 
ArgumentBuilder().withName(name).withMinimum(min).withMaximum(max);
+
+      if (defaultValue != null) {
+        argBuilder = argBuilder.withDefault(defaultValue);
+      }
+
+      optBuilder.withArgument(argBuilder.create());
+    }
+
+    return optBuilder.create();
+  }
+
+  /**
+   * @param name The name of the option
+   * @return the {@link org.apache.commons.cli2.Option} with the name, else 
null
+   */
+  protected Option getCLIOption(String name) {
+    for (Option option : options) {
+      if (option.getPreferredName().equals(name)) {
+        return option;
+      }
+    }
+    return null;
+  }
+
+  /** Parse the arguments specified based on the options defined using the 
+   *  various {@code addOption} methods. If -h is specified or an
+   *  exception is encountered print help and return null. Has the 
+   *  side effect of setting inputPath and outputPath 
+   *  if {@code addInputOption} or {@code addOutputOption}
+   *  or {@code mapred.input.dir} or {@code mapred.output.dir}
+   *  are present in the Configuration.
+   *
+   * @return a {@code Map<String,String>} containing options and their 
argument values.
+   *  The presence of a flag can be tested using {@code containsKey}, while
+   *  argument values can be retrieved using {@code get(optionName)}. The
+   *  names used for keys are the option name parameter prefixed by '--'.
+   *
+   * @see #parseArguments(String[], boolean, boolean)  -- passes in false, 
false for the optional args.
+   */
+  public Map<String, List<String>> parseArguments(String[] args) throws 
IOException {
+    return parseArguments(args, false, false);
+  }
+
+  /**
+   *
+   * @param args  The args to parse
+   * @param inputOptional if false, then the input option, if set, need not be 
present.  If true and input is an option
+   *                      and there is no input, then throw an error
+   * @param outputOptional if false, then the output option, if set, need not 
be present.  If true and output is an
+   *                       option and there is no output, then throw an error
+   * @return the args parsed into a map.
+   */
+  public Map<String, List<String>> parseArguments(String[] args, boolean 
inputOptional, boolean outputOptional)
+    throws IOException {
+    Option helpOpt = addOption(DefaultOptionCreator.helpOption());
+    addOption("tempDir", null, "Intermediate output directory", "temp");
+    addOption("startPhase", null, "First phase to run", "0");
+    addOption("endPhase", null, "Last phase to run", 
String.valueOf(Integer.MAX_VALUE));
+
+    GroupBuilder gBuilder = new GroupBuilder().withName("Job-Specific 
Options:");
+
+    for (Option opt : options) {
+      gBuilder = gBuilder.withOption(opt);
+    }
+
+    group = gBuilder.create();
+
+    CommandLine cmdLine;
+    try {
+      Parser parser = new Parser();
+      parser.setGroup(group);
+      parser.setHelpOption(helpOpt);
+      cmdLine = parser.parse(args);
+
+    } catch (OptionException e) {
+      log.error(e.getMessage());
+      CommandLineUtil.printHelpWithGenericOptions(group, e);
+      return null;
+    }
+
+    if (cmdLine.hasOption(helpOpt)) {
+      CommandLineUtil.printHelpWithGenericOptions(group);
+      return null;
+    }
+
+    try {
+      parseDirectories(cmdLine, inputOptional, outputOptional);
+    } catch (IllegalArgumentException e) {
+      log.error(e.getMessage());
+      CommandLineUtil.printHelpWithGenericOptions(group);
+      return null;
+    }
+
+    argMap = new TreeMap<>();
+    maybePut(argMap, cmdLine, this.options.toArray(new 
Option[this.options.size()]));
+
+    this.tempPath = new Path(getOption("tempDir"));
+
+    if (!hasOption("quiet")) {
+      log.info("Command line arguments: {}", argMap);
+    }
+    return argMap;
+  }
+  
+  /**
+   * Build the option key (--name) from the option name
+   */
+  public static String keyFor(String optionName) {
+    return "--" + optionName;
+  }
+
+  /**
+   * @return the requested option, or null if it has not been specified
+   */
+  public String getOption(String optionName) {
+    List<String> list = argMap.get(keyFor(optionName));
+    if (list != null && !list.isEmpty()) {
+      return list.get(0);
+    }
+    return null;
+  }
+
+  /**
+   * Get the option, else the default
+   * @param optionName The name of the option to look up, without the --
+   * @param defaultVal The default value.
+   * @return The requested option, else the default value if it doesn't exist
+   */
+  public String getOption(String optionName, String defaultVal) {
+    String res = getOption(optionName);
+    if (res == null) {
+      res = defaultVal;
+    }
+    return res;
+  }
+
+  public int getInt(String optionName) {
+    return Integer.parseInt(getOption(optionName));
+  }
+
+  public int getInt(String optionName, int defaultVal) {
+    return Integer.parseInt(getOption(optionName, String.valueOf(defaultVal)));
+  }
+
+  public float getFloat(String optionName) {
+    return Float.parseFloat(getOption(optionName));
+  }
+
+  public float getFloat(String optionName, float defaultVal) {
+    return Float.parseFloat(getOption(optionName, String.valueOf(defaultVal)));
+  }
+
+  /**
+   * Options can occur multiple times, so return the list
+   * @param optionName The unadorned (no "--" prefixing it) option name
+   * @return The values, else null.  If the option is present, but has no 
values, then the result will be an
+   * empty list (Collections.emptyList())
+   */
+  public List<String> getOptions(String optionName) {
+    return argMap.get(keyFor(optionName));
+  }
+
+  /**
+   * @return if the requested option has been specified
+   */
+  public boolean hasOption(String optionName) {
+    return argMap.containsKey(keyFor(optionName));
+  }
+
+
+  /**
+   * Get the cardinality of the input vectors
+   *
+   * @param matrix
+   * @return the cardinality of the vector
+   */
+  public int getDimensions(Path matrix) throws IOException {
+    try (SequenceFile.Reader reader = new 
SequenceFile.Reader(FileSystem.get(getConf()), matrix, getConf())){
+      Writable row = 
ClassUtils.instantiateAs(reader.getKeyClass().asSubclass(Writable.class), 
Writable.class);
+      
Preconditions.checkArgument(reader.getValueClass().equals(VectorWritable.class),
+          "value type of sequencefile must be a VectorWritable");
+
+      VectorWritable vectorWritable = new VectorWritable();
+      boolean hasAtLeastOneRow = reader.next(row, vectorWritable);
+      Preconditions.checkState(hasAtLeastOneRow, "matrix must have at least 
one row");
+      return vectorWritable.get().size();
+    }
+  }
+
+  /**
+   * Obtain input and output directories from command-line options or hadoop
+   *  properties. If {@code addInputOption} or {@code addOutputOption}
+   *  has been called, this method will throw an {@code OptionException} if
+   *  no source (command-line or property) for that value is present. 
+   *  Otherwise, {@code inputPath} or {@code outputPath} will be
+   *  non-null only if specified as a hadoop property. Command-line options
+   *  take precedence over hadoop properties.
+   *
+   * @throws IllegalArgumentException if either inputOption is present,
+   *   and neither {@code --input} nor {@code -Dmapred.input dir} are
+   *   specified or outputOption is present and neither {@code --output}
+   *   nor {@code -Dmapred.output.dir} are specified.
+   */
+  protected void parseDirectories(CommandLine cmdLine, boolean inputOptional, 
boolean outputOptional) {
+
+    Configuration conf = getConf();
+
+    if (inputOption != null && cmdLine.hasOption(inputOption)) {
+      this.inputPath = new Path(cmdLine.getValue(inputOption).toString());
+      this.inputFile = new File(cmdLine.getValue(inputOption).toString());
+    }
+    if (inputPath == null && conf.get("mapred.input.dir") != null) {
+      this.inputPath = new Path(conf.get("mapred.input.dir"));
+    }
+
+    if (outputOption != null && cmdLine.hasOption(outputOption)) {
+      this.outputPath = new Path(cmdLine.getValue(outputOption).toString());
+      this.outputFile = new File(cmdLine.getValue(outputOption).toString());
+    }
+    if (outputPath == null && conf.get("mapred.output.dir") != null) {
+      this.outputPath = new Path(conf.get("mapred.output.dir"));
+    }
+
+    Preconditions.checkArgument(inputOptional || inputOption == null || 
inputPath != null,
+        "No input specified or -Dmapred.input.dir must be provided to specify 
input directory");
+    Preconditions.checkArgument(outputOptional || outputOption == null || 
outputPath != null,
+        "No output specified:  or -Dmapred.output.dir must be provided to 
specify output directory");
+  }
+
+  protected static void maybePut(Map<String, List<String>> args, CommandLine 
cmdLine, Option... opt) {
+    for (Option o : opt) {
+
+      // the option appeared on the command-line, or it has a value
+      // (which is likely a default value). 
+      if (cmdLine.hasOption(o) || cmdLine.getValue(o) != null
+          || (cmdLine.getValues(o) != null && 
!cmdLine.getValues(o).isEmpty())) {
+
+        // nulls are ok, for cases where options are simple flags.
+        List<?> vo = cmdLine.getValues(o);
+        if (vo != null && !vo.isEmpty()) {
+          List<String> vals = new ArrayList<>();
+          for (Object o1 : vo) {
+            vals.add(o1.toString());
+          }
+          args.put(o.getPreferredName(), vals);
+        } else {
+          args.put(o.getPreferredName(), null);
+        }
+      }
+    }
+  }
+
+  /**
+   *
+   * @param args The input argument map
+   * @param optName The adorned (including "--") option name
+   * @return The first value in the match, else null
+   */
+  public static String getOption(Map<String, List<String>> args, String 
optName) {
+    List<String> res = args.get(optName);
+    if (res != null && !res.isEmpty()) {
+      return res.get(0);
+    }
+    return null;
+  }
+
+
+  protected static boolean shouldRunNextPhase(Map<String, List<String>> args, 
AtomicInteger currentPhase) {
+    int phase = currentPhase.getAndIncrement();
+    String startPhase = getOption(args, "--startPhase");
+    String endPhase = getOption(args, "--endPhase");
+    boolean phaseSkipped = (startPhase != null && phase < 
Integer.parseInt(startPhase))
+        || (endPhase != null && phase > Integer.parseInt(endPhase));
+    if (phaseSkipped) {
+      log.info("Skipping phase {}", phase);
+    }
+    return !phaseSkipped;
+  }
+
+  protected Job prepareJob(Path inputPath,
+                           Path outputPath,
+                           Class<? extends InputFormat> inputFormat,
+                           Class<? extends Mapper> mapper,
+                           Class<? extends Writable> mapperKey,
+                           Class<? extends Writable> mapperValue,
+                           Class<? extends OutputFormat> outputFormat) throws 
IOException {
+    return prepareJob(inputPath, outputPath, inputFormat, mapper, mapperKey, 
mapperValue, outputFormat, null);
+
+  }
+  protected Job prepareJob(Path inputPath,
+                           Path outputPath,
+                           Class<? extends InputFormat> inputFormat,
+                           Class<? extends Mapper> mapper,
+                           Class<? extends Writable> mapperKey,
+                           Class<? extends Writable> mapperValue,
+                           Class<? extends OutputFormat> outputFormat,
+                           String jobname) throws IOException {
+
+    Job job = HadoopUtil.prepareJob(inputPath, outputPath,
+            inputFormat, mapper, mapperKey, mapperValue, outputFormat, 
getConf());
+
+    String name =
+        jobname != null ? jobname : 
HadoopUtil.getCustomJobName(getClass().getSimpleName(), job, mapper, 
Reducer.class);
+
+    job.setJobName(name);
+    return job;
+
+  }
+
+  protected Job prepareJob(Path inputPath, Path outputPath, Class<? extends 
Mapper> mapper,
+      Class<? extends Writable> mapperKey, Class<? extends Writable> 
mapperValue, Class<? extends Reducer> reducer,
+      Class<? extends Writable> reducerKey, Class<? extends Writable> 
reducerValue) throws IOException {
+    return prepareJob(inputPath, outputPath, SequenceFileInputFormat.class, 
mapper, mapperKey, mapperValue, reducer,
+        reducerKey, reducerValue, SequenceFileOutputFormat.class);
+  }
+
+  protected Job prepareJob(Path inputPath,
+                           Path outputPath,
+                           Class<? extends InputFormat> inputFormat,
+                           Class<? extends Mapper> mapper,
+                           Class<? extends Writable> mapperKey,
+                           Class<? extends Writable> mapperValue,
+                           Class<? extends Reducer> reducer,
+                           Class<? extends Writable> reducerKey,
+                           Class<? extends Writable> reducerValue,
+                           Class<? extends OutputFormat> outputFormat) throws 
IOException {
+    Job job = HadoopUtil.prepareJob(inputPath, outputPath,
+            inputFormat, mapper, mapperKey, mapperValue, reducer, reducerKey, 
reducerValue, outputFormat, getConf());
+    job.setJobName(HadoopUtil.getCustomJobName(getClass().getSimpleName(), 
job, mapper, Reducer.class));
+    return job;
+  }
+
+  /**
+   * necessary to make this job (having a combined input path) work on Amazon 
S3, hopefully this is
+   * obsolete when MultipleInputs is available again
+   */
+  public static void setS3SafeCombinedInputPath(Job job, Path referencePath, 
Path inputPathOne, Path inputPathTwo)
+    throws IOException {
+    FileSystem fs = FileSystem.get(referencePath.toUri(), 
job.getConfiguration());
+    FileInputFormat.setInputPaths(job, inputPathOne.makeQualified(fs), 
inputPathTwo.makeQualified(fs));
+  }
+
+  protected Class<? extends Analyzer> getAnalyzerClassFromOption() throws 
ClassNotFoundException {
+    Class<? extends Analyzer> analyzerClass = StandardAnalyzer.class;
+    if (hasOption(DefaultOptionCreator.ANALYZER_NAME_OPTION)) {
+      String className = getOption(DefaultOptionCreator.ANALYZER_NAME_OPTION);
+      analyzerClass = Class.forName(className).asSubclass(Analyzer.class);
+      // try instantiating it, b/c there isn't any point in setting it if
+      // you can't instantiate it
+      //ClassUtils.instantiateAs(analyzerClass, Analyzer.class);
+      AnalyzerUtils.createAnalyzer(analyzerClass);
+    }
+    return analyzerClass;
+  }
+  
+  /**
+   * Overrides the base implementation to install the Oozie action 
configuration resource
+   * into the provided Configuration object; note that ToolRunner calls 
setConf on the Tool
+   * before it invokes run.
+   */
+  @Override
+  public void setConf(Configuration conf) {
+    super.setConf(conf);
+      
+    // If running in an Oozie workflow as a Java action, need to add the
+    // Configuration resource provided by Oozie to this job's config.
+    String oozieActionConfXml = System.getProperty("oozie.action.conf.xml");
+    if (oozieActionConfXml != null && conf != null) {
+      conf.addResource(new Path("file:///", oozieActionConfXml));
+      log.info("Added Oozie action Configuration resource {} to the Hadoop 
Configuration", oozieActionConfXml);
+    }      
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/ClassUtils.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/src/main/java/org/apache/mahout/common/ClassUtils.java 
b/community/mahout-mr/src/main/java/org/apache/mahout/common/ClassUtils.java
new file mode 100644
index 0000000..8052ef1
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/ClassUtils.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common;
+
+import java.lang.reflect.InvocationTargetException;
+
+public final class ClassUtils {
+
+  private ClassUtils() {}
+
+  public static <T> T instantiateAs(String classname, Class<T> 
asSubclassOfClass) {
+    try {
+      return 
instantiateAs(Class.forName(classname).asSubclass(asSubclassOfClass), 
asSubclassOfClass);
+    } catch (ClassNotFoundException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  public static <T> T instantiateAs(String classname, Class<T> 
asSubclassOfClass, Class<?>[] params, Object[] args) {
+    try {
+      return 
instantiateAs(Class.forName(classname).asSubclass(asSubclassOfClass), 
asSubclassOfClass, params, args);
+    } catch (ClassNotFoundException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  public static <T> T instantiateAs(Class<? extends T> clazz,
+                                    Class<T> asSubclassOfClass,
+                                    Class<?>[] params,
+                                    Object[] args) {
+    try {
+      return 
clazz.asSubclass(asSubclassOfClass).getConstructor(params).newInstance(args);
+    } catch (InstantiationException | IllegalAccessException | 
NoSuchMethodException | InvocationTargetException ie) {
+      throw new IllegalStateException(ie);
+    }
+  }
+
+
+  public static <T> T instantiateAs(Class<? extends T> clazz, Class<T> 
asSubclassOfClass) {
+    try {
+      return 
clazz.asSubclass(asSubclassOfClass).getConstructor().newInstance();
+    } catch (InstantiationException | IllegalAccessException | 
NoSuchMethodException | InvocationTargetException ie) {
+      throw new IllegalStateException(ie);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/CommandLineUtil.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/src/main/java/org/apache/mahout/common/CommandLineUtil.java
 
b/community/mahout-mr/src/main/java/org/apache/mahout/common/CommandLineUtil.java
new file mode 100644
index 0000000..ac4ab88
--- /dev/null
+++ 
b/community/mahout-mr/src/main/java/org/apache/mahout/common/CommandLineUtil.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common;
+
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+
+import org.apache.commons.cli2.Group;
+import org.apache.commons.cli2.OptionException;
+import org.apache.commons.cli2.util.HelpFormatter;
+import org.apache.commons.io.Charsets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.GenericOptionsParser;
+
+public final class CommandLineUtil {
+  
+  private CommandLineUtil() { }
+  
+  public static void printHelp(Group group) {
+    HelpFormatter formatter = new HelpFormatter();
+    formatter.setGroup(group);
+    formatter.print();
+  }
+ 
+  /**
+   * Print the options supported by {@code GenericOptionsParser}.
+   * In addition to the options supported by the job, passed in as the
+   * group parameter.
+   *
+   * @param group job-specific command-line options.
+   */
+  public static void printHelpWithGenericOptions(Group group) throws 
IOException {
+    new GenericOptionsParser(new Configuration(), new 
org.apache.commons.cli.Options(), new String[0]);
+    PrintWriter pw = new PrintWriter(new OutputStreamWriter(System.out, 
Charsets.UTF_8), true);
+    HelpFormatter formatter = new HelpFormatter();
+    formatter.setGroup(group);
+    formatter.setPrintWriter(pw);
+    formatter.setFooter("Specify HDFS directories while running on hadoop; 
else specify local file system directories");
+    formatter.print();
+  }
+
+  public static void printHelpWithGenericOptions(Group group, OptionException 
oe) throws IOException {
+    new GenericOptionsParser(new Configuration(), new 
org.apache.commons.cli.Options(), new String[0]);
+    PrintWriter pw = new PrintWriter(new OutputStreamWriter(System.out, 
Charsets.UTF_8), true);
+    HelpFormatter formatter = new HelpFormatter();
+    formatter.setGroup(group);
+    formatter.setPrintWriter(pw);
+    formatter.setException(oe);
+    formatter.print();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/HadoopUtil.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/src/main/java/org/apache/mahout/common/HadoopUtil.java 
b/community/mahout-mr/src/main/java/org/apache/mahout/common/HadoopUtil.java
new file mode 100644
index 0000000..34515aa
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/HadoopUtil.java
@@ -0,0 +1,435 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
+import 
org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterator;
+import 
org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class HadoopUtil {
+
+  private static final Logger log = LoggerFactory.getLogger(HadoopUtil.class);
+
+  private HadoopUtil() { }
+
+  /**
+   * Create a map-only Hadoop Job out of the passed in parameters.  Does not 
set the
+   * Job name.
+   *
+   * @see #getCustomJobName(String, org.apache.hadoop.mapreduce.JobContext, 
Class, Class)
+   */
+  public static Job prepareJob(Path inputPath,
+                           Path outputPath,
+                           Class<? extends InputFormat> inputFormat,
+                           Class<? extends Mapper> mapper,
+                           Class<? extends Writable> mapperKey,
+                           Class<? extends Writable> mapperValue,
+                           Class<? extends OutputFormat> outputFormat, 
Configuration conf) throws IOException {
+
+    Job job = new Job(new Configuration(conf));
+    Configuration jobConf = job.getConfiguration();
+
+    if (mapper.equals(Mapper.class)) {
+      throw new IllegalStateException("Can't figure out the user class jar 
file from mapper/reducer");
+    }
+    job.setJarByClass(mapper);
+
+    job.setInputFormatClass(inputFormat);
+    jobConf.set("mapred.input.dir", inputPath.toString());
+
+    job.setMapperClass(mapper);
+    job.setMapOutputKeyClass(mapperKey);
+    job.setMapOutputValueClass(mapperValue);
+    job.setOutputKeyClass(mapperKey);
+    job.setOutputValueClass(mapperValue);
+    jobConf.setBoolean("mapred.compress.map.output", true);
+    job.setNumReduceTasks(0);
+
+    job.setOutputFormatClass(outputFormat);
+    jobConf.set("mapred.output.dir", outputPath.toString());
+
+    return job;
+  }
+
+  /**
+   * Create a map and reduce Hadoop job.  Does not set the name on the job.
+   * @param inputPath The input {@link org.apache.hadoop.fs.Path}
+   * @param outputPath The output {@link org.apache.hadoop.fs.Path}
+   * @param inputFormat The {@link org.apache.hadoop.mapreduce.InputFormat}
+   * @param mapper The {@link org.apache.hadoop.mapreduce.Mapper} class to use
+   * @param mapperKey The {@link org.apache.hadoop.io.Writable} key class.  If 
the Mapper is a no-op,
+   *                  this value may be null
+   * @param mapperValue The {@link org.apache.hadoop.io.Writable} value class. 
 If the Mapper is a no-op,
+   *                    this value may be null
+   * @param reducer The {@link org.apache.hadoop.mapreduce.Reducer} to use
+   * @param reducerKey The reducer key class.
+   * @param reducerValue The reducer value class.
+   * @param outputFormat The {@link org.apache.hadoop.mapreduce.OutputFormat}.
+   * @param conf The {@link org.apache.hadoop.conf.Configuration} to use.
+   * @return The {@link org.apache.hadoop.mapreduce.Job}.
+   * @throws IOException if there is a problem with the IO.
+   *
+   * @see #getCustomJobName(String, org.apache.hadoop.mapreduce.JobContext, 
Class, Class)
+   * @see #prepareJob(org.apache.hadoop.fs.Path, org.apache.hadoop.fs.Path, 
Class, Class, Class, Class, Class,
+   * org.apache.hadoop.conf.Configuration)
+   */
+  public static Job prepareJob(Path inputPath,
+                           Path outputPath,
+                           Class<? extends InputFormat> inputFormat,
+                           Class<? extends Mapper> mapper,
+                           Class<? extends Writable> mapperKey,
+                           Class<? extends Writable> mapperValue,
+                           Class<? extends Reducer> reducer,
+                           Class<? extends Writable> reducerKey,
+                           Class<? extends Writable> reducerValue,
+                           Class<? extends OutputFormat> outputFormat,
+                           Configuration conf) throws IOException {
+
+    Job job = new Job(new Configuration(conf));
+    Configuration jobConf = job.getConfiguration();
+
+    if (reducer.equals(Reducer.class)) {
+      if (mapper.equals(Mapper.class)) {
+        throw new IllegalStateException("Can't figure out the user class jar 
file from mapper/reducer");
+      }
+      job.setJarByClass(mapper);
+    } else {
+      job.setJarByClass(reducer);
+    }
+
+    job.setInputFormatClass(inputFormat);
+    jobConf.set("mapred.input.dir", inputPath.toString());
+
+    job.setMapperClass(mapper);
+    if (mapperKey != null) {
+      job.setMapOutputKeyClass(mapperKey);
+    }
+    if (mapperValue != null) {
+      job.setMapOutputValueClass(mapperValue);
+    }
+
+    jobConf.setBoolean("mapred.compress.map.output", true);
+
+    job.setReducerClass(reducer);
+    job.setOutputKeyClass(reducerKey);
+    job.setOutputValueClass(reducerValue);
+
+    job.setOutputFormatClass(outputFormat);
+    jobConf.set("mapred.output.dir", outputPath.toString());
+
+    return job;
+  }
+
+
+  public static String getCustomJobName(String className, JobContext job,
+                                  Class<? extends Mapper> mapper,
+                                  Class<? extends Reducer> reducer) {
+    StringBuilder name = new StringBuilder(100);
+    String customJobName = job.getJobName();
+    if (customJobName == null || customJobName.trim().isEmpty()) {
+      name.append(className);
+    } else {
+      name.append(customJobName);
+    }
+    name.append('-').append(mapper.getSimpleName());
+    name.append('-').append(reducer.getSimpleName());
+    return name.toString();
+  }
+
+
+  public static void delete(Configuration conf, Iterable<Path> paths) throws 
IOException {
+    if (conf == null) {
+      conf = new Configuration();
+    }
+    for (Path path : paths) {
+      FileSystem fs = path.getFileSystem(conf);
+      if (fs.exists(path)) {
+        log.info("Deleting {}", path);
+        fs.delete(path, true);
+      }
+    }
+  }
+
+  public static void delete(Configuration conf, Path... paths) throws 
IOException {
+    delete(conf, Arrays.asList(paths));
+  }
+
+  public static long countRecords(Path path, Configuration conf) throws 
IOException {
+    long count = 0;
+    Iterator<?> iterator = new SequenceFileValueIterator<>(path, true, conf);
+    while (iterator.hasNext()) {
+      iterator.next();
+      count++;
+    }
+    return count;
+  }
+
+  /**
+   * Count all the records in a directory using a
+   * {@link 
org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterator}
+   *
+   * @param path The {@link org.apache.hadoop.fs.Path} to count
+   * @param pt The {@link 
org.apache.mahout.common.iterator.sequencefile.PathType}
+   * @param filter Apply the {@link org.apache.hadoop.fs.PathFilter}.  May be 
null
+   * @param conf The Hadoop {@link org.apache.hadoop.conf.Configuration}
+   * @return The number of records
+   * @throws IOException if there was an IO error
+   */
+  public static long countRecords(Path path, PathType pt, PathFilter filter, 
Configuration conf) throws IOException {
+    long count = 0;
+    Iterator<?> iterator = new SequenceFileDirValueIterator<>(path, pt, 
filter, null, true, conf);
+    while (iterator.hasNext()) {
+      iterator.next();
+      count++;
+    }
+    return count;
+  }
+
+  public static InputStream openStream(Path path, Configuration conf) throws 
IOException {
+    FileSystem fs = FileSystem.get(path.toUri(), conf);
+    return fs.open(path.makeQualified(path.toUri(), path));
+  }
+
+  public static FileStatus[] getFileStatus(Path path, PathType pathType, 
PathFilter filter,
+      Comparator<FileStatus> ordering, Configuration conf) throws IOException {
+    FileStatus[] statuses;
+    FileSystem fs = path.getFileSystem(conf);
+    if (filter == null) {
+      statuses = pathType == PathType.GLOB ? fs.globStatus(path) : 
listStatus(fs, path);
+    } else {
+      statuses = pathType == PathType.GLOB ? fs.globStatus(path, filter) : 
listStatus(fs, path, filter);
+    }
+    if (ordering != null) {
+      Arrays.sort(statuses, ordering);
+    }
+    return statuses;
+  }
+
+  public static FileStatus[] listStatus(FileSystem fs, Path path) throws 
IOException {
+    try {
+      return fs.listStatus(path);
+    } catch (FileNotFoundException e) {
+      return new FileStatus[0];
+    }
+  }
+
+  public static FileStatus[] listStatus(FileSystem fs, Path path, PathFilter 
filter) throws IOException {
+    try {
+      return fs.listStatus(path, filter);
+    } catch (FileNotFoundException e) {
+      return new FileStatus[0];
+    }
+  }
+
+  public static void cacheFiles(Path fileToCache, Configuration conf) {
+    DistributedCache.setCacheFiles(new URI[]{fileToCache.toUri()}, conf);
+  }
+
+  /**
+   * Return the first cached file in the list, else null if thre are no cached 
files.
+   * @param conf - MapReduce Configuration
+   * @return Path of Cached file
+   * @throws IOException - IO Exception
+   */
+  public static Path getSingleCachedFile(Configuration conf) throws 
IOException {
+    return getCachedFiles(conf)[0];
+  }
+
+  /**
+   * Retrieves paths to cached files.
+   * @param conf - MapReduce Configuration
+   * @return Path[] of Cached Files
+   * @throws IOException - IO Exception
+   * @throws IllegalStateException if no cache files are found
+   */
+  public static Path[] getCachedFiles(Configuration conf) throws IOException {
+    LocalFileSystem localFs = FileSystem.getLocal(conf);
+    Path[] cacheFiles = DistributedCache.getLocalCacheFiles(conf);
+
+    URI[] fallbackFiles = DistributedCache.getCacheFiles(conf);
+
+    // fallback for local execution
+    if (cacheFiles == null) {
+
+      Preconditions.checkState(fallbackFiles != null, "Unable to find cached 
files!");
+
+      cacheFiles = new Path[fallbackFiles.length];
+      for (int n = 0; n < fallbackFiles.length; n++) {
+        cacheFiles[n] = new Path(fallbackFiles[n].getPath());
+      }
+    } else {
+
+      for (int n = 0; n < cacheFiles.length; n++) {
+        cacheFiles[n] = localFs.makeQualified(cacheFiles[n]);
+        // fallback for local execution
+        if (!localFs.exists(cacheFiles[n])) {
+          cacheFiles[n] = new Path(fallbackFiles[n].getPath());
+        }
+      }
+    }
+
+    Preconditions.checkState(cacheFiles.length > 0, "Unable to find cached 
files!");
+
+    return cacheFiles;
+  }
+
+  public static void setSerializations(Configuration configuration) {
+    configuration.set("io.serializations", 
"org.apache.hadoop.io.serializer.JavaSerialization,"
+        + "org.apache.hadoop.io.serializer.WritableSerialization");
+  }
+
+  public static void writeInt(int value, Path path, Configuration 
configuration) throws IOException {
+    FileSystem fs = FileSystem.get(path.toUri(), configuration);
+    try (FSDataOutputStream out = fs.create(path)) {
+      out.writeInt(value);
+    }
+  }
+
+  public static int readInt(Path path, Configuration configuration) throws 
IOException {
+    FileSystem fs = FileSystem.get(path.toUri(), configuration);
+    try (FSDataInputStream in = fs.open(path)) {
+      return in.readInt();
+    }
+  }
+
+  /**
+   * Builds a comma-separated list of input splits
+   * @param fs - File System
+   * @param fileStatus - File Status
+   * @return list of directories as a comma-separated String
+   * @throws IOException - IO Exception
+   */
+  public static String buildDirList(FileSystem fs, FileStatus fileStatus) 
throws IOException {
+    boolean containsFiles = false;
+    List<String> directoriesList = new ArrayList<>();
+    for (FileStatus childFileStatus : fs.listStatus(fileStatus.getPath())) {
+      if (childFileStatus.isDir()) {
+        String subDirectoryList = buildDirList(fs, childFileStatus);
+        directoriesList.add(subDirectoryList);
+      } else {
+        containsFiles = true;
+      }
+    }
+
+    if (containsFiles) {
+      directoriesList.add(fileStatus.getPath().toUri().getPath());
+    }
+    return Joiner.on(',').skipNulls().join(directoriesList.iterator());
+  }
+
+  /**
+   * Builds a comma-separated list of input splits
+   * @param fs - File System
+   * @param fileStatus - File Status
+   * @param pathFilter - path filter
+   * @return list of directories as a comma-separated String
+   * @throws IOException - IO Exception
+   */
+  public static String buildDirList(FileSystem fs, FileStatus fileStatus, 
PathFilter pathFilter) throws IOException {
+    boolean containsFiles = false;
+    List<String> directoriesList = new ArrayList<>();
+    for (FileStatus childFileStatus : fs.listStatus(fileStatus.getPath(), 
pathFilter)) {
+      if (childFileStatus.isDir()) {
+        String subDirectoryList = buildDirList(fs, childFileStatus);
+        directoriesList.add(subDirectoryList);
+      } else {
+        containsFiles = true;
+      }
+    }
+
+    if (containsFiles) {
+      directoriesList.add(fileStatus.getPath().toUri().getPath());
+    }
+    return Joiner.on(',').skipNulls().join(directoriesList.iterator());
+  }
+
+  /**
+   *
+   * @param configuration  -  configuration
+   * @param filePath - Input File Path
+   * @return relative file Path
+   * @throws IOException - IO Exception
+   */
+  public static String calcRelativeFilePath(Configuration configuration, Path 
filePath) throws IOException {
+    FileSystem fs = filePath.getFileSystem(configuration);
+    FileStatus fst = fs.getFileStatus(filePath);
+    String currentPath = fst.getPath().toString().replaceFirst("file:", "");
+
+    String basePath = configuration.get("baseinputpath");
+    if (!basePath.endsWith("/")) {
+      basePath += "/";
+    }
+    basePath = basePath.replaceFirst("file:", "");
+    String[] parts = currentPath.split(basePath);
+
+    if (parts.length == 2) {
+      return parts[1];
+    } else if (parts.length == 1) {
+      return parts[0];
+    }
+    return currentPath;
+  }
+
+  /**
+   * Finds a file in the DistributedCache
+   *
+   * @param partOfFilename a substring of the file name
+   * @param localFiles holds references to files stored in distributed cache
+   * @return Path to first matched file or null if nothing was found
+   **/
+  public static Path findInCacheByPartOfFilename(String partOfFilename, URI[] 
localFiles) {
+    for (URI distCacheFile : localFiles) {
+      log.info("trying find a file in distributed cache containing [{}] in its 
name", partOfFilename);
+      if (distCacheFile != null && 
distCacheFile.toString().contains(partOfFilename)) {
+        log.info("found file [{}] containing [{}]", distCacheFile.toString(), 
partOfFilename);
+        return new Path(distCacheFile.getPath());
+      }
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/IntPairWritable.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/src/main/java/org/apache/mahout/common/IntPairWritable.java
 
b/community/mahout-mr/src/main/java/org/apache/mahout/common/IntPairWritable.java
new file mode 100644
index 0000000..dacd66f
--- /dev/null
+++ 
b/community/mahout-mr/src/main/java/org/apache/mahout/common/IntPairWritable.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common;
+
+import org.apache.hadoop.io.BinaryComparable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+
+/**
+ * A {@link WritableComparable} which encapsulates an ordered pair of signed 
integers.
+ */
+public final class IntPairWritable extends BinaryComparable
+    implements WritableComparable<BinaryComparable>, Cloneable {
+
+  static final int INT_BYTE_LENGTH = 4;
+  static final int INT_PAIR_BYTE_LENGTH = 2 * INT_BYTE_LENGTH;
+  private byte[] b = new byte[INT_PAIR_BYTE_LENGTH];
+  
+  public IntPairWritable() {
+    setFirst(0);
+    setSecond(0);
+  }
+  
+  public IntPairWritable(IntPairWritable pair) {
+    b = Arrays.copyOf(pair.getBytes(), INT_PAIR_BYTE_LENGTH);
+  }
+  
+  public IntPairWritable(int x, int y) {
+    putInt(x, b, 0);
+    putInt(y, b, INT_BYTE_LENGTH);
+  }
+  
+  public void set(int x, int y) {
+    putInt(x, b, 0);
+    putInt(y, b, INT_BYTE_LENGTH);
+  }
+  
+  public void setFirst(int x) {
+    putInt(x, b, 0);
+  }
+  
+  public int getFirst() {
+    return getInt(b, 0);
+  }
+  
+  public void setSecond(int y) {
+    putInt(y, b, INT_BYTE_LENGTH);
+  }
+  
+  public int getSecond() {
+    return getInt(b, INT_BYTE_LENGTH);
+  }
+  
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    in.readFully(b);
+  }
+  
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.write(b);
+  }
+  
+  @Override
+  public int hashCode() {
+    return Arrays.hashCode(b);
+  }
+  
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (!super.equals(obj)) {
+      return false;
+    }
+    if (!(obj instanceof IntPairWritable)) {
+      return false;
+    }
+    IntPairWritable other = (IntPairWritable) obj;
+    return Arrays.equals(b, other.b);
+  }
+
+  @Override
+  public int compareTo(BinaryComparable other) {
+    return Comparator.doCompare(b, 0, ((IntPairWritable) other).b, 0);
+  }
+
+  @Override
+  public Object clone() {
+    return new IntPairWritable(this);
+  }
+  
+  @Override
+  public String toString() {
+    return "(" + getFirst() + ", " + getSecond() + ')';
+  }
+  
+  @Override
+  public byte[] getBytes() {
+    return b;
+  }
+  
+  @Override
+  public int getLength() {
+    return INT_PAIR_BYTE_LENGTH;
+  }
+  
+  private static void putInt(int value, byte[] b, int offset) {
+    for (int i = offset, j = 24; j >= 0; i++, j -= 8) {
+      b[i] = (byte) (value >> j);
+    }
+  }
+  
+  private static int getInt(byte[] b, int offset) {
+    int value = 0;
+    for (int i = offset, j = 24; j >= 0; i++, j -= 8) {
+      value |= (b[i] & 0xFF) << j;
+    }
+    return value;
+  }
+
+  static {
+    WritableComparator.define(IntPairWritable.class, new Comparator());
+  }
+
+  public static final class Comparator extends WritableComparator implements 
Serializable {
+    public Comparator() {
+      super(IntPairWritable.class);
+    }
+    
+    @Override
+    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+      return doCompare(b1, s1, b2, s2);
+    }
+
+    static int doCompare(byte[] b1, int s1, byte[] b2, int s2) {
+      int compare1 = compareInts(b1, s1, b2, s2);
+      if (compare1 != 0) {
+        return compare1;
+      }
+      return compareInts(b1, s1 + INT_BYTE_LENGTH, b2, s2 + INT_BYTE_LENGTH);
+    }
+
+    private static int compareInts(byte[] b1, int s1, byte[] b2, int s2) {
+      // Like WritableComparator.compareBytes(), but treats first byte as 
signed value
+      int end1 = s1 + INT_BYTE_LENGTH;
+      for (int i = s1, j = s2; i < end1; i++, j++) {
+        int a = b1[i];
+        int b = b2[j];
+        if (i > s1) {
+          a &= 0xff;
+          b &= 0xff;
+        }
+        if (a != b) {
+          return a - b;
+        }
+      }
+      return 0;
+    }
+  }
+  
+  /**
+   * Compare only the first part of the pair, so that reduce is called once 
for each value of the first part.
+   */
+  public static class FirstGroupingComparator extends WritableComparator 
implements Serializable {
+    
+    public FirstGroupingComparator() {
+      super(IntPairWritable.class);
+    }
+    
+    @Override
+    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+      int firstb1 = WritableComparator.readInt(b1, s1);
+      int firstb2 = WritableComparator.readInt(b2, s2);
+      if (firstb1 < firstb2) {
+        return -1;
+      } else if (firstb1 > firstb2) {
+        return 1;
+      } else {
+        return 0;
+      }
+    }
+    
+    @Override
+    public int compare(Object o1, Object o2) {
+      int firstb1 = ((IntPairWritable) o1).getFirst();
+      int firstb2 = ((IntPairWritable) o2).getFirst();
+      if (firstb1 < firstb2) {
+        return -1;
+      }
+      if (firstb1 > firstb2) {
+        return 1;
+      }
+      return 0;
+    }
+    
+  }
+  
+  /** A wrapper class that associates pairs with frequency (Occurrences) */
+  public static class Frequency implements Comparable<Frequency>, Serializable 
{
+    
+    private final IntPairWritable pair;
+    private final double frequency;
+
+    public Frequency(IntPairWritable bigram, double frequency) {
+      this.pair = new IntPairWritable(bigram);
+      this.frequency = frequency;
+    }
+
+    public double getFrequency() {
+      return frequency;
+    }
+
+    public IntPairWritable getPair() {
+      return pair;
+    }
+
+    @Override
+    public int hashCode() {
+      return pair.hashCode() + RandomUtils.hashDouble(frequency);
+    }
+    
+    @Override
+    public boolean equals(Object right) {
+      if (!(right instanceof Frequency)) {
+        return false;
+      }
+      Frequency that = (Frequency) right;
+      return pair.equals(that.pair) && frequency == that.frequency;
+    }
+    
+    @Override
+    public int compareTo(Frequency that) {
+      if (frequency < that.frequency) {
+        return -1;
+      }
+      if (frequency > that.frequency) {
+        return 1;
+      }
+      return 0;
+    }
+    
+    @Override
+    public String toString() {
+      return pair + "\t" + frequency;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/IntegerTuple.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/src/main/java/org/apache/mahout/common/IntegerTuple.java 
b/community/mahout-mr/src/main/java/org/apache/mahout/common/IntegerTuple.java
new file mode 100644
index 0000000..f456d4d
--- /dev/null
+++ 
b/community/mahout-mr/src/main/java/org/apache/mahout/common/IntegerTuple.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * An Ordered List of Integers which can be used in a Hadoop Map/Reduce Job
+ * 
+ * 
+ */
+public final class IntegerTuple implements WritableComparable<IntegerTuple> {
+  
+  private List<Integer> tuple = Lists.newArrayList();
+  
+  public IntegerTuple() { }
+  
+  public IntegerTuple(Integer firstEntry) {
+    add(firstEntry);
+  }
+  
+  public IntegerTuple(Iterable<Integer> entries) {
+    for (Integer entry : entries) {
+      add(entry);
+    }
+  }
+  
+  public IntegerTuple(Integer[] entries) {
+    for (Integer entry : entries) {
+      add(entry);
+    }
+  }
+  
+  /**
+   * add an entry to the end of the list
+   * 
+   * @param entry
+   * @return true if the items get added
+   */
+  public boolean add(Integer entry) {
+    return tuple.add(entry);
+  }
+  
+  /**
+   * Fetches the string at the given location
+   * 
+   * @param index
+   * @return String value at the given location in the tuple list
+   */
+  public Integer integerAt(int index) {
+    return tuple.get(index);
+  }
+  
+  /**
+   * Replaces the string at the given index with the given newString
+   * 
+   * @param index
+   * @param newInteger
+   * @return The previous value at that location
+   */
+  public Integer replaceAt(int index, Integer newInteger) {
+    return tuple.set(index, newInteger);
+  }
+  
+  /**
+   * Fetch the list of entries from the tuple
+   * 
+   * @return a List containing the strings in the order of insertion
+   */
+  public List<Integer> getEntries() {
+    return Collections.unmodifiableList(this.tuple);
+  }
+  
+  /**
+   * Returns the length of the tuple
+   * 
+   * @return length
+   */
+  public int length() {
+    return this.tuple.size();
+  }
+  
+  @Override
+  public String toString() {
+    return tuple.toString();
+  }
+  
+  @Override
+  public int hashCode() {
+    return tuple.hashCode();
+  }
+  
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+    IntegerTuple other = (IntegerTuple) obj;
+    if (tuple == null) {
+      if (other.tuple != null) {
+        return false;
+      }
+    } else if (!tuple.equals(other.tuple)) {
+      return false;
+    }
+    return true;
+  }
+  
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    int len = in.readInt();
+    tuple = Lists.newArrayListWithCapacity(len);
+    for (int i = 0; i < len; i++) {
+      int data = in.readInt();
+      tuple.add(data);
+    }
+  }
+  
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(tuple.size());
+    for (Integer entry : tuple) {
+      out.writeInt(entry);
+    }
+  }
+  
+  @Override
+  public int compareTo(IntegerTuple otherTuple) {
+    int thisLength = length();
+    int otherLength = otherTuple.length();
+    int min = Math.min(thisLength, otherLength);
+    for (int i = 0; i < min; i++) {
+      int ret = this.tuple.get(i).compareTo(otherTuple.integerAt(i));
+      if (ret == 0) {
+        continue;
+      }
+      return ret;
+    }
+    if (thisLength < otherLength) {
+      return -1;
+    } else if (thisLength > otherLength) {
+      return 1;
+    } else {
+      return 0;
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/LongPair.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/src/main/java/org/apache/mahout/common/LongPair.java 
b/community/mahout-mr/src/main/java/org/apache/mahout/common/LongPair.java
new file mode 100644
index 0000000..5215e3a
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/LongPair.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common;
+
+import java.io.Serializable;
+
+import com.google.common.primitives.Longs;
+
+/** A simple (ordered) pair of longs. */
+public final class LongPair implements Comparable<LongPair>, Serializable {
+  
+  private final long first;
+  private final long second;
+  
+  public LongPair(long first, long second) {
+    this.first = first;
+    this.second = second;
+  }
+  
+  public long getFirst() {
+    return first;
+  }
+  
+  public long getSecond() {
+    return second;
+  }
+  
+  public LongPair swap() {
+    return new LongPair(second, first);
+  }
+  
+  @Override
+  public boolean equals(Object obj) {
+    if (!(obj instanceof LongPair)) {
+      return false;
+    }
+    LongPair otherPair = (LongPair) obj;
+    return first == otherPair.getFirst() && second == otherPair.getSecond();
+  }
+  
+  @Override
+  public int hashCode() {
+    int firstHash = Longs.hashCode(first);
+    // Flip top and bottom 16 bits; this makes the hash function probably 
different
+    // for (a,b) versus (b,a)
+    return (firstHash >>> 16 | firstHash << 16) ^ Longs.hashCode(second);
+  }
+  
+  @Override
+  public String toString() {
+    return '(' + String.valueOf(first) + ',' + second + ')';
+  }
+  
+  @Override
+  public int compareTo(LongPair o) {
+    if (first < o.getFirst()) {
+      return -1;
+    } else if (first > o.getFirst()) {
+      return 1;
+    } else {
+      return second < o.getSecond() ? -1 : second > o.getSecond() ? 1 : 0;
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/MemoryUtil.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/src/main/java/org/apache/mahout/common/MemoryUtil.java 
b/community/mahout-mr/src/main/java/org/apache/mahout/common/MemoryUtil.java
new file mode 100644
index 0000000..f241b53
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/MemoryUtil.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.common;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Memory utilities.
+ */
+public final class MemoryUtil {
+
+  private static final Logger log = LoggerFactory.getLogger(MemoryUtil.class);
+
+  private MemoryUtil() {
+  }
+
+  /**
+   * Logs current heap memory statistics.
+   *
+   * @see Runtime
+   */
+  public static void logMemoryStatistics() {
+    Runtime runtime = Runtime.getRuntime();
+    long freeBytes = runtime.freeMemory();
+    long maxBytes = runtime.maxMemory();
+    long totalBytes = runtime.totalMemory();
+    long usedBytes = totalBytes - freeBytes;
+    log.info("Memory (bytes): {} used, {} heap, {} max", usedBytes, totalBytes,
+             maxBytes);
+  }
+
+  private static volatile ScheduledExecutorService scheduler;
+
+  /**
+   * Constructs and starts a memory logger thread.
+   *
+   * @param rateInMillis how often memory info should be logged.
+   */
+  public static void startMemoryLogger(long rateInMillis) {
+    stopMemoryLogger();
+    scheduler = Executors.newScheduledThreadPool(1, new ThreadFactory() {
+      private final ThreadFactory delegate = Executors.defaultThreadFactory();
+
+      @Override
+      public Thread newThread(Runnable r) {
+        Thread t = delegate.newThread(r);
+        t.setDaemon(true);
+        return t;
+      }
+    });
+    Runnable memoryLoogerRunnable = new Runnable() {
+      @Override
+      public void run() {
+        logMemoryStatistics();
+      }
+    };
+    scheduler.scheduleAtFixedRate(memoryLoogerRunnable, rateInMillis, 
rateInMillis,
+        TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * Constructs and starts a memory logger thread with a logging rate of 1000 
milliseconds.
+   */
+  public static void startMemoryLogger() {
+    startMemoryLogger(1000);
+  }
+
+  /**
+   * Stops the memory logger, if any, started via {@link 
#startMemoryLogger(long)} or
+   * {@link #startMemoryLogger()}.
+   */
+  public static void stopMemoryLogger() {
+    if (scheduler != null) {
+      scheduler.shutdownNow();
+      scheduler = null;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/Pair.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/src/main/java/org/apache/mahout/common/Pair.java 
b/community/mahout-mr/src/main/java/org/apache/mahout/common/Pair.java
new file mode 100644
index 0000000..d2ad6a1
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/Pair.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common;
+
+import java.io.Serializable;
+
+/** A simple (ordered) pair of two objects. Elements may be null. */
+public final class Pair<A,B> implements Comparable<Pair<A,B>>, Serializable {
+  
+  private final A first;
+  private final B second;
+  
+  public Pair(A first, B second) {
+    this.first = first;
+    this.second = second;
+  }
+  
+  public A getFirst() {
+    return first;
+  }
+  
+  public B getSecond() {
+    return second;
+  }
+  
+  public Pair<B, A> swap() {
+    return new Pair<>(second, first);
+  }
+
+  public static <A,B> Pair<A,B> of(A a, B b) {
+    return new Pair<>(a, b);
+  }
+  
+  @Override
+  public boolean equals(Object obj) {
+    if (!(obj instanceof Pair<?, ?>)) {
+      return false;
+    }
+    Pair<?, ?> otherPair = (Pair<?, ?>) obj;
+    return isEqualOrNulls(first, otherPair.getFirst())
+        && isEqualOrNulls(second, otherPair.getSecond());
+  }
+  
+  private static boolean isEqualOrNulls(Object obj1, Object obj2) {
+    return obj1 == null ? obj2 == null : obj1.equals(obj2);
+  }
+  
+  @Override
+  public int hashCode() {
+    int firstHash = hashCodeNull(first);
+    // Flip top and bottom 16 bits; this makes the hash function probably 
different
+    // for (a,b) versus (b,a)
+    return (firstHash >>> 16 | firstHash << 16) ^ hashCodeNull(second);
+  }
+  
+  private static int hashCodeNull(Object obj) {
+    return obj == null ? 0 : obj.hashCode();
+  }
+  
+  @Override
+  public String toString() {
+    return '(' + String.valueOf(first) + ',' + second + ')';
+  }
+
+  /**
+   * Defines an ordering on pairs that sorts by first value's natural 
ordering, ascending,
+   * and then by second value's natural ordering.
+   *
+   * @throws ClassCastException if types are not actually {@link Comparable}
+   */
+  @Override
+  public int compareTo(Pair<A,B> other) {
+    Comparable<A> thisFirst = (Comparable<A>) first;
+    A thatFirst = other.getFirst();
+    int compare = thisFirst.compareTo(thatFirst);
+    if (compare != 0) {
+      return compare;
+    }
+    Comparable<B> thisSecond = (Comparable<B>) second;
+    B thatSecond = other.getSecond();
+    return thisSecond.compareTo(thatSecond);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/Parameters.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/src/main/java/org/apache/mahout/common/Parameters.java 
b/community/mahout-mr/src/main/java/org/apache/mahout/common/Parameters.java
new file mode 100644
index 0000000..e74c534
--- /dev/null
+++ b/community/mahout-mr/src/main/java/org/apache/mahout/common/Parameters.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common;
+
+import java.io.IOException;
+import java.util.Map;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DefaultStringifier;
+import org.apache.hadoop.util.GenericsUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Parameters {
+  
+  private static final Logger log = LoggerFactory.getLogger(Parameters.class);
+  
+  private Map<String,String> params = Maps.newHashMap();
+
+  public Parameters() {
+
+  }
+
+  public Parameters(String serializedString) throws IOException {
+    this(parseParams(serializedString));
+  }
+
+  protected Parameters(Map<String,String> params) {
+    this.params = params;
+  }
+
+  public String get(String key) {
+    return params.get(key);
+  }
+  
+  public String get(String key, String defaultValue) {
+    String ret = params.get(key);
+    return ret == null ? defaultValue : ret;
+  }
+  
+  public void set(String key, String value) {
+    params.put(key, value);
+  }
+
+  public int getInt(String key, int defaultValue) {
+    String ret = params.get(key);
+    return ret == null ? defaultValue : Integer.parseInt(ret);
+  }
+
+  @Override
+  public String toString() {
+    Configuration conf = new Configuration();
+    conf.set("io.serializations",
+             "org.apache.hadoop.io.serializer.JavaSerialization,"
+             + "org.apache.hadoop.io.serializer.WritableSerialization");
+    DefaultStringifier<Map<String,String>> mapStringifier = new 
DefaultStringifier<>(conf,
+        GenericsUtil.getClass(params));
+    try {
+      return mapStringifier.toString(params);
+    } catch (IOException e) {
+      log.info("Encountered IOException while deserializing returning empty 
string", e);
+      return "";
+    }
+    
+  }
+  
+  public String print() {
+    return params.toString();
+  }
+
+  public static Map<String,String> parseParams(String serializedString) throws 
IOException {
+    Configuration conf = new Configuration();
+    conf.set("io.serializations",
+             "org.apache.hadoop.io.serializer.JavaSerialization,"
+             + "org.apache.hadoop.io.serializer.WritableSerialization");
+    Map<String,String> params = Maps.newHashMap();
+    DefaultStringifier<Map<String,String>> mapStringifier = new 
DefaultStringifier<>(conf,
+        GenericsUtil.getClass(params));
+    return mapStringifier.fromString(serializedString);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/StringTuple.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/src/main/java/org/apache/mahout/common/StringTuple.java 
b/community/mahout-mr/src/main/java/org/apache/mahout/common/StringTuple.java
new file mode 100644
index 0000000..0de1a4a
--- /dev/null
+++ 
b/community/mahout-mr/src/main/java/org/apache/mahout/common/StringTuple.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * An Ordered List of Strings which can be used in a Hadoop Map/Reduce Job
+ */
+public final class StringTuple implements WritableComparable<StringTuple> {
+  
+  private List<String> tuple = Lists.newArrayList();
+  
+  public StringTuple() { }
+  
+  public StringTuple(String firstEntry) {
+    add(firstEntry);
+  }
+  
+  public StringTuple(Iterable<String> entries) {
+    for (String entry : entries) {
+      add(entry);
+    }
+  }
+  
+  public StringTuple(String[] entries) {
+    for (String entry : entries) {
+      add(entry);
+    }
+  }
+  
+  /**
+   * add an entry to the end of the list
+   * 
+   * @param entry
+   * @return true if the items get added
+   */
+  public boolean add(String entry) {
+    return tuple.add(entry);
+  }
+  
+  /**
+   * Fetches the string at the given location
+   * 
+   * @param index
+   * @return String value at the given location in the tuple list
+   */
+  public String stringAt(int index) {
+    return tuple.get(index);
+  }
+  
+  /**
+   * Replaces the string at the given index with the given newString
+   * 
+   * @param index
+   * @param newString
+   * @return The previous value at that location
+   */
+  public String replaceAt(int index, String newString) {
+    return tuple.set(index, newString);
+  }
+  
+  /**
+   * Fetch the list of entries from the tuple
+   * 
+   * @return a List containing the strings in the order of insertion
+   */
+  public List<String> getEntries() {
+    return Collections.unmodifiableList(this.tuple);
+  }
+  
+  /**
+   * Returns the length of the tuple
+   * 
+   * @return length
+   */
+  public int length() {
+    return this.tuple.size();
+  }
+  
+  @Override
+  public String toString() {
+    return tuple.toString();
+  }
+  
+  @Override
+  public int hashCode() {
+    return tuple.hashCode();
+  }
+  
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+    StringTuple other = (StringTuple) obj;
+    if (tuple == null) {
+      if (other.tuple != null) {
+        return false;
+      }
+    } else if (!tuple.equals(other.tuple)) {
+      return false;
+    }
+    return true;
+  }
+  
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    int len = in.readInt();
+    tuple = Lists.newArrayListWithCapacity(len);
+    Text value = new Text();
+    for (int i = 0; i < len; i++) {
+      value.readFields(in);
+      tuple.add(value.toString());
+    }
+  }
+  
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(tuple.size());
+    Text value = new Text();
+    for (String entry : tuple) {
+      value.set(entry);
+      value.write(out);
+    }
+  }
+  
+  @Override
+  public int compareTo(StringTuple otherTuple) {
+    int thisLength = length();
+    int otherLength = otherTuple.length();
+    int min = Math.min(thisLength, otherLength);
+    for (int i = 0; i < min; i++) {
+      int ret = this.tuple.get(i).compareTo(otherTuple.stringAt(i));
+      if (ret != 0) {
+        return ret;
+      }
+    }
+    if (thisLength < otherLength) {
+      return -1;
+    } else if (thisLength > otherLength) {
+      return 1;
+    } else {
+      return 0;
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/StringUtils.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/src/main/java/org/apache/mahout/common/StringUtils.java 
b/community/mahout-mr/src/main/java/org/apache/mahout/common/StringUtils.java
new file mode 100644
index 0000000..a064596
--- /dev/null
+++ 
b/community/mahout-mr/src/main/java/org/apache/mahout/common/StringUtils.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common;
+
+import java.util.regex.Pattern;
+
+import com.thoughtworks.xstream.XStream;
+
+/**
+ * Offers two methods to convert an object to a string representation and 
restore the object given its string
+ * representation. Should use Hadoop Stringifier whenever available.
+ */
+public final class StringUtils {
+  
+  private static final XStream XSTREAM = new XStream();
+  private static final Pattern NEWLINE_PATTERN = Pattern.compile("\n");
+  private static final Pattern XMLRESERVED = 
Pattern.compile("\"|\\&|\\<|\\>|\'");
+
+  private StringUtils() {
+  // do nothing
+  }
+  
+  /**
+   * Converts the object to a one-line string representation
+   * 
+   * @param obj
+   *          the object to convert
+   * @return the string representation of the object
+   */
+  public static String toString(Object obj) {
+    return NEWLINE_PATTERN.matcher(XSTREAM.toXML(obj)).replaceAll("");
+  }
+  
+  /**
+   * Restores the object from its string representation.
+   * 
+   * @param str
+   *          the string representation of the object
+   * @return restored object
+   */
+  public static <T> T fromString(String str) {
+    return (T) XSTREAM.fromXML(str);
+  }
+
+  public static String escapeXML(CharSequence input) {
+    return XMLRESERVED.matcher(input).replaceAll("_");
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/common/TimingStatistics.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/src/main/java/org/apache/mahout/common/TimingStatistics.java
 
b/community/mahout-mr/src/main/java/org/apache/mahout/common/TimingStatistics.java
new file mode 100644
index 0000000..5ee2066
--- /dev/null
+++ 
b/community/mahout-mr/src/main/java/org/apache/mahout/common/TimingStatistics.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common;
+
+import java.io.Serializable;
+import java.text.DecimalFormat;
+
+public final class TimingStatistics implements Serializable {
+  private static final DecimalFormat DF = new DecimalFormat("#.##");
+  private int nCalls;
+  private long minTime;
+  private long maxTime;
+  private long sumTime;
+  private long leadSumTime;
+  private double sumSquaredTime;
+
+
+  /** Creates a new instance of CallStats */
+  public TimingStatistics() { }
+
+  public TimingStatistics(int nCalls, long minTime, long maxTime, long 
sumTime, double sumSquaredTime) {
+    this.nCalls = nCalls;
+    this.minTime = minTime;
+    this.maxTime = maxTime;
+    this.sumTime = sumTime;
+    this.sumSquaredTime = sumSquaredTime;
+  }
+
+  public synchronized int getNCalls() {
+    return nCalls;
+  }
+
+  public synchronized long getMinTime() {
+    return Math.max(0, minTime);
+  }
+
+  public synchronized long getMaxTime() {
+    return maxTime;
+  }
+
+  public synchronized long getSumTime() {
+    return sumTime;
+  }
+
+  public synchronized double getSumSquaredTime() {
+    return sumSquaredTime;
+  }
+
+  public synchronized long getMeanTime() {
+    return nCalls == 0 ? 0 : sumTime / nCalls;
+  }
+
+  public synchronized long getStdDevTime() {
+    if (nCalls == 0) {
+      return 0;
+    }
+    double mean = getMeanTime();
+    double meanSquared = mean * mean;
+    double meanOfSquares = sumSquaredTime / nCalls;
+    double variance = meanOfSquares - meanSquared;
+    if (variance < 0) {
+      return 0; // might happen due to rounding error
+    }
+    return (long) Math.sqrt(variance);
+  }
+
+  @Override
+  public synchronized String toString() {
+    return '\n'
+        + "nCalls = " + nCalls + ";\n"
+        + "sum    = " + DF.format(sumTime / 1000000000.0) + "s;\n"
+        + "min    = " + DF.format(minTime / 1000000.0) + "ms;\n"
+        + "max    = " + DF.format(maxTime / 1000000.0) + "ms;\n"
+        + "mean   = " + DF.format(getMeanTime() / 1000.0) + "us;\n"
+        + "stdDev = " + DF.format(getStdDevTime() / 1000.0) + "us;";
+  }
+
+  /** Ignores counting the performance metrics until leadTimeIsFinished The 
caller should enough time for the JIT
+   *  to warm up. */
+  public Call newCall(long leadTimeUsec) {
+    if (leadSumTime > leadTimeUsec) {
+      return new Call();
+    } else {
+      return new LeadTimeCall();
+    }
+  }
+
+  /** Ignores counting the performance metrics. The caller should enough time 
for the JIT to warm up. */
+  public final class LeadTimeCall extends Call {
+
+    private LeadTimeCall() { }
+
+    @Override
+    public void end() {
+      long elapsed = System.nanoTime() - startTime;
+      synchronized (TimingStatistics.this) {
+        leadSumTime += elapsed;
+      }
+    }
+
+    @Override
+    public boolean end(long sumMaxUsec) {
+      end();
+      return false;
+    }
+  }
+
+  /**
+   * A call object that can update performance metrics.
+   */
+  public class Call {
+    protected final long startTime = System.nanoTime();
+
+    private Call() { }
+
+    public void end() {
+      long elapsed = System.nanoTime() - startTime;
+      synchronized (TimingStatistics.this) {
+        nCalls++;
+        if (elapsed < minTime || nCalls == 1) {
+          minTime = elapsed;
+        }
+        if (elapsed > maxTime) {
+          maxTime = elapsed;
+        }
+        sumTime += elapsed;
+        sumSquaredTime += elapsed * elapsed;
+      }
+    }
+
+    /**
+     * Returns true if the sumTime as reached this limit;
+     */
+    public boolean end(long sumMaxUsec) {
+      end();
+      return sumMaxUsec < sumTime;
+    }
+  }
+}

Reply via email to