http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/common/AbstractJob.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/mr/src/main/java/org/apache/mahout/common/AbstractJob.java b/community/mahout-mr/mr/src/main/java/org/apache/mahout/common/AbstractJob.java new file mode 100644 index 0000000..8072466 --- /dev/null +++ b/community/mahout-mr/mr/src/main/java/org/apache/mahout/common/AbstractJob.java @@ -0,0 +1,648 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.base.Preconditions; +import org.apache.commons.cli2.CommandLine; +import org.apache.commons.cli2.Group; +import org.apache.commons.cli2.Option; +import org.apache.commons.cli2.OptionException; +import org.apache.commons.cli2.builder.ArgumentBuilder; +import org.apache.commons.cli2.builder.DefaultOptionBuilder; +import org.apache.commons.cli2.builder.GroupBuilder; +import org.apache.commons.cli2.commandline.Parser; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.analysis.standard.StandardAnalyzer; +import org.apache.mahout.common.commandline.DefaultOptionCreator; +import org.apache.mahout.common.lucene.AnalyzerUtils; +import org.apache.mahout.math.VectorWritable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * <p>Superclass of many Mahout Hadoop "jobs". A job drives configuration and launch of one or + * more maps and reduces in order to accomplish some task.</p> + * + * <p>Command line arguments available to all subclasses are:</p> + * + * <ul> + * <li>--tempDir (path): Specifies a directory where the job may place temp files + * (default "temp")</li> + * <li>--help: Show help message</li> + * </ul> + * + * <p>In addition, note some key command line parameters that are parsed by Hadoop, which jobs + * may need to set:</p> + * + * <ul> + * <li>-Dmapred.job.name=(name): Sets the Hadoop task names. It will be suffixed by + * the mapper and reducer class names</li> + * <li>-Dmapred.output.compress={true,false}: Compress final output (default true)</li> + * <li>-Dmapred.input.dir=(path): input file, or directory containing input files (required)</li> + * <li>-Dmapred.output.dir=(path): path to write output files (required)</li> + * </ul> + * + * <p>Note that because of how Hadoop parses arguments, all "-D" arguments must appear before all other + * arguments.</p> + */ +public abstract class AbstractJob extends Configured implements Tool { + + private static final Logger log = LoggerFactory.getLogger(AbstractJob.class); + + /** option used to specify the input path */ + private Option inputOption; + + /** option used to specify the output path */ + private Option outputOption; + + /** input path, populated by {@link #parseArguments(String[])} */ + protected Path inputPath; + protected File inputFile; //the input represented as a file + + /** output path, populated by {@link #parseArguments(String[])} */ + protected Path outputPath; + protected File outputFile; //the output represented as a file + + /** temp path, populated by {@link #parseArguments(String[])} */ + protected Path tempPath; + + protected Map<String, List<String>> argMap; + + /** internal list of options that have been added */ + private final List<Option> options; + private Group group; + + protected AbstractJob() { + options = new LinkedList<>(); + } + + /** Returns the input path established by a call to {@link #parseArguments(String[])}. + * The source of the path may be an input option added using {@link #addInputOption()} + * or it may be the value of the {@code mapred.input.dir} configuration + * property. + */ + protected Path getInputPath() { + return inputPath; + } + + /** Returns the output path established by a call to {@link #parseArguments(String[])}. + * The source of the path may be an output option added using {@link #addOutputOption()} + * or it may be the value of the {@code mapred.input.dir} configuration + * property. + */ + protected Path getOutputPath() { + return outputPath; + } + + protected Path getOutputPath(String path) { + return new Path(outputPath, path); + } + + protected File getInputFile() { + return inputFile; + } + + protected File getOutputFile() { + return outputFile; + } + + + protected Path getTempPath() { + return tempPath; + } + + protected Path getTempPath(String directory) { + return new Path(tempPath, directory); + } + + @Override + public Configuration getConf() { + Configuration result = super.getConf(); + if (result == null) { + return new Configuration(); + } + return result; + } + + /** Add an option with no argument whose presence can be checked for using + * {@code containsKey} method on the map returned by {@link #parseArguments(String[])}; + */ + protected void addFlag(String name, String shortName, String description) { + options.add(buildOption(name, shortName, description, false, false, null)); + } + + /** Add an option to the the set of options this job will parse when + * {@link #parseArguments(String[])} is called. This options has an argument + * with null as its default value. + */ + protected void addOption(String name, String shortName, String description) { + options.add(buildOption(name, shortName, description, true, false, null)); + } + + /** Add an option to the the set of options this job will parse when + * {@link #parseArguments(String[])} is called. + * + * @param required if true the {@link #parseArguments(String[])} will throw + * fail with an error and usage message if this option is not specified + * on the command line. + */ + protected void addOption(String name, String shortName, String description, boolean required) { + options.add(buildOption(name, shortName, description, true, required, null)); + } + + /** Add an option to the the set of options this job will parse when + * {@link #parseArguments(String[])} is called. If this option is not + * specified on the command line the default value will be + * used. + * + * @param defaultValue the default argument value if this argument is not + * found on the command-line. null is allowed. + */ + protected void addOption(String name, String shortName, String description, String defaultValue) { + options.add(buildOption(name, shortName, description, true, false, defaultValue)); + } + + /** Add an arbitrary option to the set of options this job will parse when + * {@link #parseArguments(String[])} is called. If this option has no + * argument, use {@code containsKey} on the map returned by + * {@code parseArguments} to check for its presence. Otherwise, the + * string value of the option will be placed in the map using a key + * equal to this options long name preceded by '--'. + * @return the option added. + */ + protected Option addOption(Option option) { + options.add(option); + return option; + } + + protected Group getGroup() { + return group; + } + + /** Add the default input directory option, '-i' which takes a directory + * name as an argument. When {@link #parseArguments(String[])} is + * called, the inputPath will be set based upon the value for this option. + * If this method is called, the input is required. + */ + protected void addInputOption() { + this.inputOption = addOption(DefaultOptionCreator.inputOption().create()); + } + + /** Add the default output directory option, '-o' which takes a directory + * name as an argument. When {@link #parseArguments(String[])} is + * called, the outputPath will be set based upon the value for this option. + * If this method is called, the output is required. + */ + protected void addOutputOption() { + this.outputOption = addOption(DefaultOptionCreator.outputOption().create()); + } + + /** Build an option with the given parameters. Name and description are + * required. + * + * @param name the long name of the option prefixed with '--' on the command-line + * @param shortName the short name of the option, prefixed with '-' on the command-line + * @param description description of the option displayed in help method + * @param hasArg true if the option has an argument. + * @param required true if the option is required. + * @param defaultValue default argument value, can be null. + * @return the option. + */ + protected static Option buildOption(String name, + String shortName, + String description, + boolean hasArg, + boolean required, + String defaultValue) { + + return buildOption(name, shortName, description, hasArg, 1, 1, required, defaultValue); + } + + protected static Option buildOption(String name, + String shortName, + String description, + boolean hasArg, int min, int max, + boolean required, + String defaultValue) { + + DefaultOptionBuilder optBuilder = new DefaultOptionBuilder().withLongName(name).withDescription(description) + .withRequired(required); + + if (shortName != null) { + optBuilder.withShortName(shortName); + } + + if (hasArg) { + ArgumentBuilder argBuilder = new ArgumentBuilder().withName(name).withMinimum(min).withMaximum(max); + + if (defaultValue != null) { + argBuilder = argBuilder.withDefault(defaultValue); + } + + optBuilder.withArgument(argBuilder.create()); + } + + return optBuilder.create(); + } + + /** + * @param name The name of the option + * @return the {@link org.apache.commons.cli2.Option} with the name, else null + */ + protected Option getCLIOption(String name) { + for (Option option : options) { + if (option.getPreferredName().equals(name)) { + return option; + } + } + return null; + } + + /** Parse the arguments specified based on the options defined using the + * various {@code addOption} methods. If -h is specified or an + * exception is encountered print help and return null. Has the + * side effect of setting inputPath and outputPath + * if {@code addInputOption} or {@code addOutputOption} + * or {@code mapred.input.dir} or {@code mapred.output.dir} + * are present in the Configuration. + * + * @return a {@code Map<String,String>} containing options and their argument values. + * The presence of a flag can be tested using {@code containsKey}, while + * argument values can be retrieved using {@code get(optionName)}. The + * names used for keys are the option name parameter prefixed by '--'. + * + * @see #parseArguments(String[], boolean, boolean) -- passes in false, false for the optional args. + */ + public Map<String, List<String>> parseArguments(String[] args) throws IOException { + return parseArguments(args, false, false); + } + + /** + * + * @param args The args to parse + * @param inputOptional if false, then the input option, if set, need not be present. If true and input is an option + * and there is no input, then throw an error + * @param outputOptional if false, then the output option, if set, need not be present. If true and output is an + * option and there is no output, then throw an error + * @return the args parsed into a map. + */ + public Map<String, List<String>> parseArguments(String[] args, boolean inputOptional, boolean outputOptional) + throws IOException { + Option helpOpt = addOption(DefaultOptionCreator.helpOption()); + addOption("tempDir", null, "Intermediate output directory", "temp"); + addOption("startPhase", null, "First phase to run", "0"); + addOption("endPhase", null, "Last phase to run", String.valueOf(Integer.MAX_VALUE)); + + GroupBuilder gBuilder = new GroupBuilder().withName("Job-Specific Options:"); + + for (Option opt : options) { + gBuilder = gBuilder.withOption(opt); + } + + group = gBuilder.create(); + + CommandLine cmdLine; + try { + Parser parser = new Parser(); + parser.setGroup(group); + parser.setHelpOption(helpOpt); + cmdLine = parser.parse(args); + + } catch (OptionException e) { + log.error(e.getMessage()); + CommandLineUtil.printHelpWithGenericOptions(group, e); + return null; + } + + if (cmdLine.hasOption(helpOpt)) { + CommandLineUtil.printHelpWithGenericOptions(group); + return null; + } + + try { + parseDirectories(cmdLine, inputOptional, outputOptional); + } catch (IllegalArgumentException e) { + log.error(e.getMessage()); + CommandLineUtil.printHelpWithGenericOptions(group); + return null; + } + + argMap = new TreeMap<>(); + maybePut(argMap, cmdLine, this.options.toArray(new Option[this.options.size()])); + + this.tempPath = new Path(getOption("tempDir")); + + if (!hasOption("quiet")) { + log.info("Command line arguments: {}", argMap); + } + return argMap; + } + + /** + * Build the option key (--name) from the option name + */ + public static String keyFor(String optionName) { + return "--" + optionName; + } + + /** + * @return the requested option, or null if it has not been specified + */ + public String getOption(String optionName) { + List<String> list = argMap.get(keyFor(optionName)); + if (list != null && !list.isEmpty()) { + return list.get(0); + } + return null; + } + + /** + * Get the option, else the default + * @param optionName The name of the option to look up, without the -- + * @param defaultVal The default value. + * @return The requested option, else the default value if it doesn't exist + */ + public String getOption(String optionName, String defaultVal) { + String res = getOption(optionName); + if (res == null) { + res = defaultVal; + } + return res; + } + + public int getInt(String optionName) { + return Integer.parseInt(getOption(optionName)); + } + + public int getInt(String optionName, int defaultVal) { + return Integer.parseInt(getOption(optionName, String.valueOf(defaultVal))); + } + + public float getFloat(String optionName) { + return Float.parseFloat(getOption(optionName)); + } + + public float getFloat(String optionName, float defaultVal) { + return Float.parseFloat(getOption(optionName, String.valueOf(defaultVal))); + } + + /** + * Options can occur multiple times, so return the list + * @param optionName The unadorned (no "--" prefixing it) option name + * @return The values, else null. If the option is present, but has no values, then the result will be an + * empty list (Collections.emptyList()) + */ + public List<String> getOptions(String optionName) { + return argMap.get(keyFor(optionName)); + } + + /** + * @return if the requested option has been specified + */ + public boolean hasOption(String optionName) { + return argMap.containsKey(keyFor(optionName)); + } + + + /** + * Get the cardinality of the input vectors + * + * @param matrix + * @return the cardinality of the vector + */ + public int getDimensions(Path matrix) throws IOException { + try (SequenceFile.Reader reader = new SequenceFile.Reader(FileSystem.get(getConf()), matrix, getConf())){ + Writable row = ClassUtils.instantiateAs(reader.getKeyClass().asSubclass(Writable.class), Writable.class); + Preconditions.checkArgument(reader.getValueClass().equals(VectorWritable.class), + "value type of sequencefile must be a VectorWritable"); + + VectorWritable vectorWritable = new VectorWritable(); + boolean hasAtLeastOneRow = reader.next(row, vectorWritable); + Preconditions.checkState(hasAtLeastOneRow, "matrix must have at least one row"); + return vectorWritable.get().size(); + } + } + + /** + * Obtain input and output directories from command-line options or hadoop + * properties. If {@code addInputOption} or {@code addOutputOption} + * has been called, this method will throw an {@code OptionException} if + * no source (command-line or property) for that value is present. + * Otherwise, {@code inputPath} or {@code outputPath} will be + * non-null only if specified as a hadoop property. Command-line options + * take precedence over hadoop properties. + * + * @throws IllegalArgumentException if either inputOption is present, + * and neither {@code --input} nor {@code -Dmapred.input dir} are + * specified or outputOption is present and neither {@code --output} + * nor {@code -Dmapred.output.dir} are specified. + */ + protected void parseDirectories(CommandLine cmdLine, boolean inputOptional, boolean outputOptional) { + + Configuration conf = getConf(); + + if (inputOption != null && cmdLine.hasOption(inputOption)) { + this.inputPath = new Path(cmdLine.getValue(inputOption).toString()); + this.inputFile = new File(cmdLine.getValue(inputOption).toString()); + } + if (inputPath == null && conf.get("mapred.input.dir") != null) { + this.inputPath = new Path(conf.get("mapred.input.dir")); + } + + if (outputOption != null && cmdLine.hasOption(outputOption)) { + this.outputPath = new Path(cmdLine.getValue(outputOption).toString()); + this.outputFile = new File(cmdLine.getValue(outputOption).toString()); + } + if (outputPath == null && conf.get("mapred.output.dir") != null) { + this.outputPath = new Path(conf.get("mapred.output.dir")); + } + + Preconditions.checkArgument(inputOptional || inputOption == null || inputPath != null, + "No input specified or -Dmapred.input.dir must be provided to specify input directory"); + Preconditions.checkArgument(outputOptional || outputOption == null || outputPath != null, + "No output specified: or -Dmapred.output.dir must be provided to specify output directory"); + } + + protected static void maybePut(Map<String, List<String>> args, CommandLine cmdLine, Option... opt) { + for (Option o : opt) { + + // the option appeared on the command-line, or it has a value + // (which is likely a default value). + if (cmdLine.hasOption(o) || cmdLine.getValue(o) != null + || (cmdLine.getValues(o) != null && !cmdLine.getValues(o).isEmpty())) { + + // nulls are ok, for cases where options are simple flags. + List<?> vo = cmdLine.getValues(o); + if (vo != null && !vo.isEmpty()) { + List<String> vals = new ArrayList<>(); + for (Object o1 : vo) { + vals.add(o1.toString()); + } + args.put(o.getPreferredName(), vals); + } else { + args.put(o.getPreferredName(), null); + } + } + } + } + + /** + * + * @param args The input argument map + * @param optName The adorned (including "--") option name + * @return The first value in the match, else null + */ + public static String getOption(Map<String, List<String>> args, String optName) { + List<String> res = args.get(optName); + if (res != null && !res.isEmpty()) { + return res.get(0); + } + return null; + } + + + protected static boolean shouldRunNextPhase(Map<String, List<String>> args, AtomicInteger currentPhase) { + int phase = currentPhase.getAndIncrement(); + String startPhase = getOption(args, "--startPhase"); + String endPhase = getOption(args, "--endPhase"); + boolean phaseSkipped = (startPhase != null && phase < Integer.parseInt(startPhase)) + || (endPhase != null && phase > Integer.parseInt(endPhase)); + if (phaseSkipped) { + log.info("Skipping phase {}", phase); + } + return !phaseSkipped; + } + + protected Job prepareJob(Path inputPath, + Path outputPath, + Class<? extends InputFormat> inputFormat, + Class<? extends Mapper> mapper, + Class<? extends Writable> mapperKey, + Class<? extends Writable> mapperValue, + Class<? extends OutputFormat> outputFormat) throws IOException { + return prepareJob(inputPath, outputPath, inputFormat, mapper, mapperKey, mapperValue, outputFormat, null); + + } + protected Job prepareJob(Path inputPath, + Path outputPath, + Class<? extends InputFormat> inputFormat, + Class<? extends Mapper> mapper, + Class<? extends Writable> mapperKey, + Class<? extends Writable> mapperValue, + Class<? extends OutputFormat> outputFormat, + String jobname) throws IOException { + + Job job = HadoopUtil.prepareJob(inputPath, outputPath, + inputFormat, mapper, mapperKey, mapperValue, outputFormat, getConf()); + + String name = + jobname != null ? jobname : HadoopUtil.getCustomJobName(getClass().getSimpleName(), job, mapper, Reducer.class); + + job.setJobName(name); + return job; + + } + + protected Job prepareJob(Path inputPath, Path outputPath, Class<? extends Mapper> mapper, + Class<? extends Writable> mapperKey, Class<? extends Writable> mapperValue, Class<? extends Reducer> reducer, + Class<? extends Writable> reducerKey, Class<? extends Writable> reducerValue) throws IOException { + return prepareJob(inputPath, outputPath, SequenceFileInputFormat.class, mapper, mapperKey, mapperValue, reducer, + reducerKey, reducerValue, SequenceFileOutputFormat.class); + } + + protected Job prepareJob(Path inputPath, + Path outputPath, + Class<? extends InputFormat> inputFormat, + Class<? extends Mapper> mapper, + Class<? extends Writable> mapperKey, + Class<? extends Writable> mapperValue, + Class<? extends Reducer> reducer, + Class<? extends Writable> reducerKey, + Class<? extends Writable> reducerValue, + Class<? extends OutputFormat> outputFormat) throws IOException { + Job job = HadoopUtil.prepareJob(inputPath, outputPath, + inputFormat, mapper, mapperKey, mapperValue, reducer, reducerKey, reducerValue, outputFormat, getConf()); + job.setJobName(HadoopUtil.getCustomJobName(getClass().getSimpleName(), job, mapper, Reducer.class)); + return job; + } + + /** + * necessary to make this job (having a combined input path) work on Amazon S3, hopefully this is + * obsolete when MultipleInputs is available again + */ + public static void setS3SafeCombinedInputPath(Job job, Path referencePath, Path inputPathOne, Path inputPathTwo) + throws IOException { + FileSystem fs = FileSystem.get(referencePath.toUri(), job.getConfiguration()); + FileInputFormat.setInputPaths(job, inputPathOne.makeQualified(fs), inputPathTwo.makeQualified(fs)); + } + + protected Class<? extends Analyzer> getAnalyzerClassFromOption() throws ClassNotFoundException { + Class<? extends Analyzer> analyzerClass = StandardAnalyzer.class; + if (hasOption(DefaultOptionCreator.ANALYZER_NAME_OPTION)) { + String className = getOption(DefaultOptionCreator.ANALYZER_NAME_OPTION); + analyzerClass = Class.forName(className).asSubclass(Analyzer.class); + // try instantiating it, b/c there isn't any point in setting it if + // you can't instantiate it + //ClassUtils.instantiateAs(analyzerClass, Analyzer.class); + AnalyzerUtils.createAnalyzer(analyzerClass); + } + return analyzerClass; + } + + /** + * Overrides the base implementation to install the Oozie action configuration resource + * into the provided Configuration object; note that ToolRunner calls setConf on the Tool + * before it invokes run. + */ + @Override + public void setConf(Configuration conf) { + super.setConf(conf); + + // If running in an Oozie workflow as a Java action, need to add the + // Configuration resource provided by Oozie to this job's config. + String oozieActionConfXml = System.getProperty("oozie.action.conf.xml"); + if (oozieActionConfXml != null && conf != null) { + conf.addResource(new Path("file:///", oozieActionConfXml)); + log.info("Added Oozie action Configuration resource {} to the Hadoop Configuration", oozieActionConfXml); + } + } +}
http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/common/ClassUtils.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/mr/src/main/java/org/apache/mahout/common/ClassUtils.java b/community/mahout-mr/mr/src/main/java/org/apache/mahout/common/ClassUtils.java new file mode 100644 index 0000000..8052ef1 --- /dev/null +++ b/community/mahout-mr/mr/src/main/java/org/apache/mahout/common/ClassUtils.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common; + +import java.lang.reflect.InvocationTargetException; + +public final class ClassUtils { + + private ClassUtils() {} + + public static <T> T instantiateAs(String classname, Class<T> asSubclassOfClass) { + try { + return instantiateAs(Class.forName(classname).asSubclass(asSubclassOfClass), asSubclassOfClass); + } catch (ClassNotFoundException e) { + throw new IllegalStateException(e); + } + } + + public static <T> T instantiateAs(String classname, Class<T> asSubclassOfClass, Class<?>[] params, Object[] args) { + try { + return instantiateAs(Class.forName(classname).asSubclass(asSubclassOfClass), asSubclassOfClass, params, args); + } catch (ClassNotFoundException e) { + throw new IllegalStateException(e); + } + } + + public static <T> T instantiateAs(Class<? extends T> clazz, + Class<T> asSubclassOfClass, + Class<?>[] params, + Object[] args) { + try { + return clazz.asSubclass(asSubclassOfClass).getConstructor(params).newInstance(args); + } catch (InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException ie) { + throw new IllegalStateException(ie); + } + } + + + public static <T> T instantiateAs(Class<? extends T> clazz, Class<T> asSubclassOfClass) { + try { + return clazz.asSubclass(asSubclassOfClass).getConstructor().newInstance(); + } catch (InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException ie) { + throw new IllegalStateException(ie); + } + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/common/CommandLineUtil.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/mr/src/main/java/org/apache/mahout/common/CommandLineUtil.java b/community/mahout-mr/mr/src/main/java/org/apache/mahout/common/CommandLineUtil.java new file mode 100644 index 0000000..ac4ab88 --- /dev/null +++ b/community/mahout-mr/mr/src/main/java/org/apache/mahout/common/CommandLineUtil.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common; + +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; + +import org.apache.commons.cli2.Group; +import org.apache.commons.cli2.OptionException; +import org.apache.commons.cli2.util.HelpFormatter; +import org.apache.commons.io.Charsets; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.GenericOptionsParser; + +public final class CommandLineUtil { + + private CommandLineUtil() { } + + public static void printHelp(Group group) { + HelpFormatter formatter = new HelpFormatter(); + formatter.setGroup(group); + formatter.print(); + } + + /** + * Print the options supported by {@code GenericOptionsParser}. + * In addition to the options supported by the job, passed in as the + * group parameter. + * + * @param group job-specific command-line options. + */ + public static void printHelpWithGenericOptions(Group group) throws IOException { + new GenericOptionsParser(new Configuration(), new org.apache.commons.cli.Options(), new String[0]); + PrintWriter pw = new PrintWriter(new OutputStreamWriter(System.out, Charsets.UTF_8), true); + HelpFormatter formatter = new HelpFormatter(); + formatter.setGroup(group); + formatter.setPrintWriter(pw); + formatter.setFooter("Specify HDFS directories while running on hadoop; else specify local file system directories"); + formatter.print(); + } + + public static void printHelpWithGenericOptions(Group group, OptionException oe) throws IOException { + new GenericOptionsParser(new Configuration(), new org.apache.commons.cli.Options(), new String[0]); + PrintWriter pw = new PrintWriter(new OutputStreamWriter(System.out, Charsets.UTF_8), true); + HelpFormatter formatter = new HelpFormatter(); + formatter.setGroup(group); + formatter.setPrintWriter(pw); + formatter.setException(oe); + formatter.print(); + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/common/HadoopUtil.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/mr/src/main/java/org/apache/mahout/common/HadoopUtil.java b/community/mahout-mr/mr/src/main/java/org/apache/mahout/common/HadoopUtil.java new file mode 100644 index 0000000..34515aa --- /dev/null +++ b/community/mahout-mr/mr/src/main/java/org/apache/mahout/common/HadoopUtil.java @@ -0,0 +1,435 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.filecache.DistributedCache; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.mahout.common.iterator.sequencefile.PathType; +import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterator; +import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class HadoopUtil { + + private static final Logger log = LoggerFactory.getLogger(HadoopUtil.class); + + private HadoopUtil() { } + + /** + * Create a map-only Hadoop Job out of the passed in parameters. Does not set the + * Job name. + * + * @see #getCustomJobName(String, org.apache.hadoop.mapreduce.JobContext, Class, Class) + */ + public static Job prepareJob(Path inputPath, + Path outputPath, + Class<? extends InputFormat> inputFormat, + Class<? extends Mapper> mapper, + Class<? extends Writable> mapperKey, + Class<? extends Writable> mapperValue, + Class<? extends OutputFormat> outputFormat, Configuration conf) throws IOException { + + Job job = new Job(new Configuration(conf)); + Configuration jobConf = job.getConfiguration(); + + if (mapper.equals(Mapper.class)) { + throw new IllegalStateException("Can't figure out the user class jar file from mapper/reducer"); + } + job.setJarByClass(mapper); + + job.setInputFormatClass(inputFormat); + jobConf.set("mapred.input.dir", inputPath.toString()); + + job.setMapperClass(mapper); + job.setMapOutputKeyClass(mapperKey); + job.setMapOutputValueClass(mapperValue); + job.setOutputKeyClass(mapperKey); + job.setOutputValueClass(mapperValue); + jobConf.setBoolean("mapred.compress.map.output", true); + job.setNumReduceTasks(0); + + job.setOutputFormatClass(outputFormat); + jobConf.set("mapred.output.dir", outputPath.toString()); + + return job; + } + + /** + * Create a map and reduce Hadoop job. Does not set the name on the job. + * @param inputPath The input {@link org.apache.hadoop.fs.Path} + * @param outputPath The output {@link org.apache.hadoop.fs.Path} + * @param inputFormat The {@link org.apache.hadoop.mapreduce.InputFormat} + * @param mapper The {@link org.apache.hadoop.mapreduce.Mapper} class to use + * @param mapperKey The {@link org.apache.hadoop.io.Writable} key class. If the Mapper is a no-op, + * this value may be null + * @param mapperValue The {@link org.apache.hadoop.io.Writable} value class. If the Mapper is a no-op, + * this value may be null + * @param reducer The {@link org.apache.hadoop.mapreduce.Reducer} to use + * @param reducerKey The reducer key class. + * @param reducerValue The reducer value class. + * @param outputFormat The {@link org.apache.hadoop.mapreduce.OutputFormat}. + * @param conf The {@link org.apache.hadoop.conf.Configuration} to use. + * @return The {@link org.apache.hadoop.mapreduce.Job}. + * @throws IOException if there is a problem with the IO. + * + * @see #getCustomJobName(String, org.apache.hadoop.mapreduce.JobContext, Class, Class) + * @see #prepareJob(org.apache.hadoop.fs.Path, org.apache.hadoop.fs.Path, Class, Class, Class, Class, Class, + * org.apache.hadoop.conf.Configuration) + */ + public static Job prepareJob(Path inputPath, + Path outputPath, + Class<? extends InputFormat> inputFormat, + Class<? extends Mapper> mapper, + Class<? extends Writable> mapperKey, + Class<? extends Writable> mapperValue, + Class<? extends Reducer> reducer, + Class<? extends Writable> reducerKey, + Class<? extends Writable> reducerValue, + Class<? extends OutputFormat> outputFormat, + Configuration conf) throws IOException { + + Job job = new Job(new Configuration(conf)); + Configuration jobConf = job.getConfiguration(); + + if (reducer.equals(Reducer.class)) { + if (mapper.equals(Mapper.class)) { + throw new IllegalStateException("Can't figure out the user class jar file from mapper/reducer"); + } + job.setJarByClass(mapper); + } else { + job.setJarByClass(reducer); + } + + job.setInputFormatClass(inputFormat); + jobConf.set("mapred.input.dir", inputPath.toString()); + + job.setMapperClass(mapper); + if (mapperKey != null) { + job.setMapOutputKeyClass(mapperKey); + } + if (mapperValue != null) { + job.setMapOutputValueClass(mapperValue); + } + + jobConf.setBoolean("mapred.compress.map.output", true); + + job.setReducerClass(reducer); + job.setOutputKeyClass(reducerKey); + job.setOutputValueClass(reducerValue); + + job.setOutputFormatClass(outputFormat); + jobConf.set("mapred.output.dir", outputPath.toString()); + + return job; + } + + + public static String getCustomJobName(String className, JobContext job, + Class<? extends Mapper> mapper, + Class<? extends Reducer> reducer) { + StringBuilder name = new StringBuilder(100); + String customJobName = job.getJobName(); + if (customJobName == null || customJobName.trim().isEmpty()) { + name.append(className); + } else { + name.append(customJobName); + } + name.append('-').append(mapper.getSimpleName()); + name.append('-').append(reducer.getSimpleName()); + return name.toString(); + } + + + public static void delete(Configuration conf, Iterable<Path> paths) throws IOException { + if (conf == null) { + conf = new Configuration(); + } + for (Path path : paths) { + FileSystem fs = path.getFileSystem(conf); + if (fs.exists(path)) { + log.info("Deleting {}", path); + fs.delete(path, true); + } + } + } + + public static void delete(Configuration conf, Path... paths) throws IOException { + delete(conf, Arrays.asList(paths)); + } + + public static long countRecords(Path path, Configuration conf) throws IOException { + long count = 0; + Iterator<?> iterator = new SequenceFileValueIterator<>(path, true, conf); + while (iterator.hasNext()) { + iterator.next(); + count++; + } + return count; + } + + /** + * Count all the records in a directory using a + * {@link org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterator} + * + * @param path The {@link org.apache.hadoop.fs.Path} to count + * @param pt The {@link org.apache.mahout.common.iterator.sequencefile.PathType} + * @param filter Apply the {@link org.apache.hadoop.fs.PathFilter}. May be null + * @param conf The Hadoop {@link org.apache.hadoop.conf.Configuration} + * @return The number of records + * @throws IOException if there was an IO error + */ + public static long countRecords(Path path, PathType pt, PathFilter filter, Configuration conf) throws IOException { + long count = 0; + Iterator<?> iterator = new SequenceFileDirValueIterator<>(path, pt, filter, null, true, conf); + while (iterator.hasNext()) { + iterator.next(); + count++; + } + return count; + } + + public static InputStream openStream(Path path, Configuration conf) throws IOException { + FileSystem fs = FileSystem.get(path.toUri(), conf); + return fs.open(path.makeQualified(path.toUri(), path)); + } + + public static FileStatus[] getFileStatus(Path path, PathType pathType, PathFilter filter, + Comparator<FileStatus> ordering, Configuration conf) throws IOException { + FileStatus[] statuses; + FileSystem fs = path.getFileSystem(conf); + if (filter == null) { + statuses = pathType == PathType.GLOB ? fs.globStatus(path) : listStatus(fs, path); + } else { + statuses = pathType == PathType.GLOB ? fs.globStatus(path, filter) : listStatus(fs, path, filter); + } + if (ordering != null) { + Arrays.sort(statuses, ordering); + } + return statuses; + } + + public static FileStatus[] listStatus(FileSystem fs, Path path) throws IOException { + try { + return fs.listStatus(path); + } catch (FileNotFoundException e) { + return new FileStatus[0]; + } + } + + public static FileStatus[] listStatus(FileSystem fs, Path path, PathFilter filter) throws IOException { + try { + return fs.listStatus(path, filter); + } catch (FileNotFoundException e) { + return new FileStatus[0]; + } + } + + public static void cacheFiles(Path fileToCache, Configuration conf) { + DistributedCache.setCacheFiles(new URI[]{fileToCache.toUri()}, conf); + } + + /** + * Return the first cached file in the list, else null if thre are no cached files. + * @param conf - MapReduce Configuration + * @return Path of Cached file + * @throws IOException - IO Exception + */ + public static Path getSingleCachedFile(Configuration conf) throws IOException { + return getCachedFiles(conf)[0]; + } + + /** + * Retrieves paths to cached files. + * @param conf - MapReduce Configuration + * @return Path[] of Cached Files + * @throws IOException - IO Exception + * @throws IllegalStateException if no cache files are found + */ + public static Path[] getCachedFiles(Configuration conf) throws IOException { + LocalFileSystem localFs = FileSystem.getLocal(conf); + Path[] cacheFiles = DistributedCache.getLocalCacheFiles(conf); + + URI[] fallbackFiles = DistributedCache.getCacheFiles(conf); + + // fallback for local execution + if (cacheFiles == null) { + + Preconditions.checkState(fallbackFiles != null, "Unable to find cached files!"); + + cacheFiles = new Path[fallbackFiles.length]; + for (int n = 0; n < fallbackFiles.length; n++) { + cacheFiles[n] = new Path(fallbackFiles[n].getPath()); + } + } else { + + for (int n = 0; n < cacheFiles.length; n++) { + cacheFiles[n] = localFs.makeQualified(cacheFiles[n]); + // fallback for local execution + if (!localFs.exists(cacheFiles[n])) { + cacheFiles[n] = new Path(fallbackFiles[n].getPath()); + } + } + } + + Preconditions.checkState(cacheFiles.length > 0, "Unable to find cached files!"); + + return cacheFiles; + } + + public static void setSerializations(Configuration configuration) { + configuration.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization," + + "org.apache.hadoop.io.serializer.WritableSerialization"); + } + + public static void writeInt(int value, Path path, Configuration configuration) throws IOException { + FileSystem fs = FileSystem.get(path.toUri(), configuration); + try (FSDataOutputStream out = fs.create(path)) { + out.writeInt(value); + } + } + + public static int readInt(Path path, Configuration configuration) throws IOException { + FileSystem fs = FileSystem.get(path.toUri(), configuration); + try (FSDataInputStream in = fs.open(path)) { + return in.readInt(); + } + } + + /** + * Builds a comma-separated list of input splits + * @param fs - File System + * @param fileStatus - File Status + * @return list of directories as a comma-separated String + * @throws IOException - IO Exception + */ + public static String buildDirList(FileSystem fs, FileStatus fileStatus) throws IOException { + boolean containsFiles = false; + List<String> directoriesList = new ArrayList<>(); + for (FileStatus childFileStatus : fs.listStatus(fileStatus.getPath())) { + if (childFileStatus.isDir()) { + String subDirectoryList = buildDirList(fs, childFileStatus); + directoriesList.add(subDirectoryList); + } else { + containsFiles = true; + } + } + + if (containsFiles) { + directoriesList.add(fileStatus.getPath().toUri().getPath()); + } + return Joiner.on(',').skipNulls().join(directoriesList.iterator()); + } + + /** + * Builds a comma-separated list of input splits + * @param fs - File System + * @param fileStatus - File Status + * @param pathFilter - path filter + * @return list of directories as a comma-separated String + * @throws IOException - IO Exception + */ + public static String buildDirList(FileSystem fs, FileStatus fileStatus, PathFilter pathFilter) throws IOException { + boolean containsFiles = false; + List<String> directoriesList = new ArrayList<>(); + for (FileStatus childFileStatus : fs.listStatus(fileStatus.getPath(), pathFilter)) { + if (childFileStatus.isDir()) { + String subDirectoryList = buildDirList(fs, childFileStatus); + directoriesList.add(subDirectoryList); + } else { + containsFiles = true; + } + } + + if (containsFiles) { + directoriesList.add(fileStatus.getPath().toUri().getPath()); + } + return Joiner.on(',').skipNulls().join(directoriesList.iterator()); + } + + /** + * + * @param configuration - configuration + * @param filePath - Input File Path + * @return relative file Path + * @throws IOException - IO Exception + */ + public static String calcRelativeFilePath(Configuration configuration, Path filePath) throws IOException { + FileSystem fs = filePath.getFileSystem(configuration); + FileStatus fst = fs.getFileStatus(filePath); + String currentPath = fst.getPath().toString().replaceFirst("file:", ""); + + String basePath = configuration.get("baseinputpath"); + if (!basePath.endsWith("/")) { + basePath += "/"; + } + basePath = basePath.replaceFirst("file:", ""); + String[] parts = currentPath.split(basePath); + + if (parts.length == 2) { + return parts[1]; + } else if (parts.length == 1) { + return parts[0]; + } + return currentPath; + } + + /** + * Finds a file in the DistributedCache + * + * @param partOfFilename a substring of the file name + * @param localFiles holds references to files stored in distributed cache + * @return Path to first matched file or null if nothing was found + **/ + public static Path findInCacheByPartOfFilename(String partOfFilename, URI[] localFiles) { + for (URI distCacheFile : localFiles) { + log.info("trying find a file in distributed cache containing [{}] in its name", partOfFilename); + if (distCacheFile != null && distCacheFile.toString().contains(partOfFilename)) { + log.info("found file [{}] containing [{}]", distCacheFile.toString(), partOfFilename); + return new Path(distCacheFile.getPath()); + } + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/common/IntPairWritable.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/mr/src/main/java/org/apache/mahout/common/IntPairWritable.java b/community/mahout-mr/mr/src/main/java/org/apache/mahout/common/IntPairWritable.java new file mode 100644 index 0000000..dacd66f --- /dev/null +++ b/community/mahout-mr/mr/src/main/java/org/apache/mahout/common/IntPairWritable.java @@ -0,0 +1,270 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common; + +import org.apache.hadoop.io.BinaryComparable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableComparator; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; +import java.util.Arrays; + +/** + * A {@link WritableComparable} which encapsulates an ordered pair of signed integers. + */ +public final class IntPairWritable extends BinaryComparable + implements WritableComparable<BinaryComparable>, Cloneable { + + static final int INT_BYTE_LENGTH = 4; + static final int INT_PAIR_BYTE_LENGTH = 2 * INT_BYTE_LENGTH; + private byte[] b = new byte[INT_PAIR_BYTE_LENGTH]; + + public IntPairWritable() { + setFirst(0); + setSecond(0); + } + + public IntPairWritable(IntPairWritable pair) { + b = Arrays.copyOf(pair.getBytes(), INT_PAIR_BYTE_LENGTH); + } + + public IntPairWritable(int x, int y) { + putInt(x, b, 0); + putInt(y, b, INT_BYTE_LENGTH); + } + + public void set(int x, int y) { + putInt(x, b, 0); + putInt(y, b, INT_BYTE_LENGTH); + } + + public void setFirst(int x) { + putInt(x, b, 0); + } + + public int getFirst() { + return getInt(b, 0); + } + + public void setSecond(int y) { + putInt(y, b, INT_BYTE_LENGTH); + } + + public int getSecond() { + return getInt(b, INT_BYTE_LENGTH); + } + + @Override + public void readFields(DataInput in) throws IOException { + in.readFully(b); + } + + @Override + public void write(DataOutput out) throws IOException { + out.write(b); + } + + @Override + public int hashCode() { + return Arrays.hashCode(b); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!super.equals(obj)) { + return false; + } + if (!(obj instanceof IntPairWritable)) { + return false; + } + IntPairWritable other = (IntPairWritable) obj; + return Arrays.equals(b, other.b); + } + + @Override + public int compareTo(BinaryComparable other) { + return Comparator.doCompare(b, 0, ((IntPairWritable) other).b, 0); + } + + @Override + public Object clone() { + return new IntPairWritable(this); + } + + @Override + public String toString() { + return "(" + getFirst() + ", " + getSecond() + ')'; + } + + @Override + public byte[] getBytes() { + return b; + } + + @Override + public int getLength() { + return INT_PAIR_BYTE_LENGTH; + } + + private static void putInt(int value, byte[] b, int offset) { + for (int i = offset, j = 24; j >= 0; i++, j -= 8) { + b[i] = (byte) (value >> j); + } + } + + private static int getInt(byte[] b, int offset) { + int value = 0; + for (int i = offset, j = 24; j >= 0; i++, j -= 8) { + value |= (b[i] & 0xFF) << j; + } + return value; + } + + static { + WritableComparator.define(IntPairWritable.class, new Comparator()); + } + + public static final class Comparator extends WritableComparator implements Serializable { + public Comparator() { + super(IntPairWritable.class); + } + + @Override + public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { + return doCompare(b1, s1, b2, s2); + } + + static int doCompare(byte[] b1, int s1, byte[] b2, int s2) { + int compare1 = compareInts(b1, s1, b2, s2); + if (compare1 != 0) { + return compare1; + } + return compareInts(b1, s1 + INT_BYTE_LENGTH, b2, s2 + INT_BYTE_LENGTH); + } + + private static int compareInts(byte[] b1, int s1, byte[] b2, int s2) { + // Like WritableComparator.compareBytes(), but treats first byte as signed value + int end1 = s1 + INT_BYTE_LENGTH; + for (int i = s1, j = s2; i < end1; i++, j++) { + int a = b1[i]; + int b = b2[j]; + if (i > s1) { + a &= 0xff; + b &= 0xff; + } + if (a != b) { + return a - b; + } + } + return 0; + } + } + + /** + * Compare only the first part of the pair, so that reduce is called once for each value of the first part. + */ + public static class FirstGroupingComparator extends WritableComparator implements Serializable { + + public FirstGroupingComparator() { + super(IntPairWritable.class); + } + + @Override + public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { + int firstb1 = WritableComparator.readInt(b1, s1); + int firstb2 = WritableComparator.readInt(b2, s2); + if (firstb1 < firstb2) { + return -1; + } else if (firstb1 > firstb2) { + return 1; + } else { + return 0; + } + } + + @Override + public int compare(Object o1, Object o2) { + int firstb1 = ((IntPairWritable) o1).getFirst(); + int firstb2 = ((IntPairWritable) o2).getFirst(); + if (firstb1 < firstb2) { + return -1; + } + if (firstb1 > firstb2) { + return 1; + } + return 0; + } + + } + + /** A wrapper class that associates pairs with frequency (Occurrences) */ + public static class Frequency implements Comparable<Frequency>, Serializable { + + private final IntPairWritable pair; + private final double frequency; + + public Frequency(IntPairWritable bigram, double frequency) { + this.pair = new IntPairWritable(bigram); + this.frequency = frequency; + } + + public double getFrequency() { + return frequency; + } + + public IntPairWritable getPair() { + return pair; + } + + @Override + public int hashCode() { + return pair.hashCode() + RandomUtils.hashDouble(frequency); + } + + @Override + public boolean equals(Object right) { + if (!(right instanceof Frequency)) { + return false; + } + Frequency that = (Frequency) right; + return pair.equals(that.pair) && frequency == that.frequency; + } + + @Override + public int compareTo(Frequency that) { + if (frequency < that.frequency) { + return -1; + } + if (frequency > that.frequency) { + return 1; + } + return 0; + } + + @Override + public String toString() { + return pair + "\t" + frequency; + } + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/common/IntegerTuple.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/mr/src/main/java/org/apache/mahout/common/IntegerTuple.java b/community/mahout-mr/mr/src/main/java/org/apache/mahout/common/IntegerTuple.java new file mode 100644 index 0000000..f456d4d --- /dev/null +++ b/community/mahout-mr/mr/src/main/java/org/apache/mahout/common/IntegerTuple.java @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +import com.google.common.collect.Lists; +import org.apache.hadoop.io.WritableComparable; + +/** + * An Ordered List of Integers which can be used in a Hadoop Map/Reduce Job + * + * + */ +public final class IntegerTuple implements WritableComparable<IntegerTuple> { + + private List<Integer> tuple = Lists.newArrayList(); + + public IntegerTuple() { } + + public IntegerTuple(Integer firstEntry) { + add(firstEntry); + } + + public IntegerTuple(Iterable<Integer> entries) { + for (Integer entry : entries) { + add(entry); + } + } + + public IntegerTuple(Integer[] entries) { + for (Integer entry : entries) { + add(entry); + } + } + + /** + * add an entry to the end of the list + * + * @param entry + * @return true if the items get added + */ + public boolean add(Integer entry) { + return tuple.add(entry); + } + + /** + * Fetches the string at the given location + * + * @param index + * @return String value at the given location in the tuple list + */ + public Integer integerAt(int index) { + return tuple.get(index); + } + + /** + * Replaces the string at the given index with the given newString + * + * @param index + * @param newInteger + * @return The previous value at that location + */ + public Integer replaceAt(int index, Integer newInteger) { + return tuple.set(index, newInteger); + } + + /** + * Fetch the list of entries from the tuple + * + * @return a List containing the strings in the order of insertion + */ + public List<Integer> getEntries() { + return Collections.unmodifiableList(this.tuple); + } + + /** + * Returns the length of the tuple + * + * @return length + */ + public int length() { + return this.tuple.size(); + } + + @Override + public String toString() { + return tuple.toString(); + } + + @Override + public int hashCode() { + return tuple.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + IntegerTuple other = (IntegerTuple) obj; + if (tuple == null) { + if (other.tuple != null) { + return false; + } + } else if (!tuple.equals(other.tuple)) { + return false; + } + return true; + } + + @Override + public void readFields(DataInput in) throws IOException { + int len = in.readInt(); + tuple = Lists.newArrayListWithCapacity(len); + for (int i = 0; i < len; i++) { + int data = in.readInt(); + tuple.add(data); + } + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(tuple.size()); + for (Integer entry : tuple) { + out.writeInt(entry); + } + } + + @Override + public int compareTo(IntegerTuple otherTuple) { + int thisLength = length(); + int otherLength = otherTuple.length(); + int min = Math.min(thisLength, otherLength); + for (int i = 0; i < min; i++) { + int ret = this.tuple.get(i).compareTo(otherTuple.integerAt(i)); + if (ret == 0) { + continue; + } + return ret; + } + if (thisLength < otherLength) { + return -1; + } else if (thisLength > otherLength) { + return 1; + } else { + return 0; + } + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/common/LongPair.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/mr/src/main/java/org/apache/mahout/common/LongPair.java b/community/mahout-mr/mr/src/main/java/org/apache/mahout/common/LongPair.java new file mode 100644 index 0000000..5215e3a --- /dev/null +++ b/community/mahout-mr/mr/src/main/java/org/apache/mahout/common/LongPair.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common; + +import java.io.Serializable; + +import com.google.common.primitives.Longs; + +/** A simple (ordered) pair of longs. */ +public final class LongPair implements Comparable<LongPair>, Serializable { + + private final long first; + private final long second; + + public LongPair(long first, long second) { + this.first = first; + this.second = second; + } + + public long getFirst() { + return first; + } + + public long getSecond() { + return second; + } + + public LongPair swap() { + return new LongPair(second, first); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof LongPair)) { + return false; + } + LongPair otherPair = (LongPair) obj; + return first == otherPair.getFirst() && second == otherPair.getSecond(); + } + + @Override + public int hashCode() { + int firstHash = Longs.hashCode(first); + // Flip top and bottom 16 bits; this makes the hash function probably different + // for (a,b) versus (b,a) + return (firstHash >>> 16 | firstHash << 16) ^ Longs.hashCode(second); + } + + @Override + public String toString() { + return '(' + String.valueOf(first) + ',' + second + ')'; + } + + @Override + public int compareTo(LongPair o) { + if (first < o.getFirst()) { + return -1; + } else if (first > o.getFirst()) { + return 1; + } else { + return second < o.getSecond() ? -1 : second > o.getSecond() ? 1 : 0; + } + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/common/MemoryUtil.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/mr/src/main/java/org/apache/mahout/common/MemoryUtil.java b/community/mahout-mr/mr/src/main/java/org/apache/mahout/common/MemoryUtil.java new file mode 100644 index 0000000..f241b53 --- /dev/null +++ b/community/mahout-mr/mr/src/main/java/org/apache/mahout/common/MemoryUtil.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.mahout.common; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Memory utilities. + */ +public final class MemoryUtil { + + private static final Logger log = LoggerFactory.getLogger(MemoryUtil.class); + + private MemoryUtil() { + } + + /** + * Logs current heap memory statistics. + * + * @see Runtime + */ + public static void logMemoryStatistics() { + Runtime runtime = Runtime.getRuntime(); + long freeBytes = runtime.freeMemory(); + long maxBytes = runtime.maxMemory(); + long totalBytes = runtime.totalMemory(); + long usedBytes = totalBytes - freeBytes; + log.info("Memory (bytes): {} used, {} heap, {} max", usedBytes, totalBytes, + maxBytes); + } + + private static volatile ScheduledExecutorService scheduler; + + /** + * Constructs and starts a memory logger thread. + * + * @param rateInMillis how often memory info should be logged. + */ + public static void startMemoryLogger(long rateInMillis) { + stopMemoryLogger(); + scheduler = Executors.newScheduledThreadPool(1, new ThreadFactory() { + private final ThreadFactory delegate = Executors.defaultThreadFactory(); + + @Override + public Thread newThread(Runnable r) { + Thread t = delegate.newThread(r); + t.setDaemon(true); + return t; + } + }); + Runnable memoryLoogerRunnable = new Runnable() { + @Override + public void run() { + logMemoryStatistics(); + } + }; + scheduler.scheduleAtFixedRate(memoryLoogerRunnable, rateInMillis, rateInMillis, + TimeUnit.MILLISECONDS); + } + + /** + * Constructs and starts a memory logger thread with a logging rate of 1000 milliseconds. + */ + public static void startMemoryLogger() { + startMemoryLogger(1000); + } + + /** + * Stops the memory logger, if any, started via {@link #startMemoryLogger(long)} or + * {@link #startMemoryLogger()}. + */ + public static void stopMemoryLogger() { + if (scheduler != null) { + scheduler.shutdownNow(); + scheduler = null; + } + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/common/Pair.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/mr/src/main/java/org/apache/mahout/common/Pair.java b/community/mahout-mr/mr/src/main/java/org/apache/mahout/common/Pair.java new file mode 100644 index 0000000..d2ad6a1 --- /dev/null +++ b/community/mahout-mr/mr/src/main/java/org/apache/mahout/common/Pair.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common; + +import java.io.Serializable; + +/** A simple (ordered) pair of two objects. Elements may be null. */ +public final class Pair<A,B> implements Comparable<Pair<A,B>>, Serializable { + + private final A first; + private final B second; + + public Pair(A first, B second) { + this.first = first; + this.second = second; + } + + public A getFirst() { + return first; + } + + public B getSecond() { + return second; + } + + public Pair<B, A> swap() { + return new Pair<>(second, first); + } + + public static <A,B> Pair<A,B> of(A a, B b) { + return new Pair<>(a, b); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof Pair<?, ?>)) { + return false; + } + Pair<?, ?> otherPair = (Pair<?, ?>) obj; + return isEqualOrNulls(first, otherPair.getFirst()) + && isEqualOrNulls(second, otherPair.getSecond()); + } + + private static boolean isEqualOrNulls(Object obj1, Object obj2) { + return obj1 == null ? obj2 == null : obj1.equals(obj2); + } + + @Override + public int hashCode() { + int firstHash = hashCodeNull(first); + // Flip top and bottom 16 bits; this makes the hash function probably different + // for (a,b) versus (b,a) + return (firstHash >>> 16 | firstHash << 16) ^ hashCodeNull(second); + } + + private static int hashCodeNull(Object obj) { + return obj == null ? 0 : obj.hashCode(); + } + + @Override + public String toString() { + return '(' + String.valueOf(first) + ',' + second + ')'; + } + + /** + * Defines an ordering on pairs that sorts by first value's natural ordering, ascending, + * and then by second value's natural ordering. + * + * @throws ClassCastException if types are not actually {@link Comparable} + */ + @Override + public int compareTo(Pair<A,B> other) { + Comparable<A> thisFirst = (Comparable<A>) first; + A thatFirst = other.getFirst(); + int compare = thisFirst.compareTo(thatFirst); + if (compare != 0) { + return compare; + } + Comparable<B> thisSecond = (Comparable<B>) second; + B thatSecond = other.getSecond(); + return thisSecond.compareTo(thatSecond); + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/common/Parameters.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/mr/src/main/java/org/apache/mahout/common/Parameters.java b/community/mahout-mr/mr/src/main/java/org/apache/mahout/common/Parameters.java new file mode 100644 index 0000000..e74c534 --- /dev/null +++ b/community/mahout-mr/mr/src/main/java/org/apache/mahout/common/Parameters.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common; + +import java.io.IOException; +import java.util.Map; + +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DefaultStringifier; +import org.apache.hadoop.util.GenericsUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Parameters { + + private static final Logger log = LoggerFactory.getLogger(Parameters.class); + + private Map<String,String> params = Maps.newHashMap(); + + public Parameters() { + + } + + public Parameters(String serializedString) throws IOException { + this(parseParams(serializedString)); + } + + protected Parameters(Map<String,String> params) { + this.params = params; + } + + public String get(String key) { + return params.get(key); + } + + public String get(String key, String defaultValue) { + String ret = params.get(key); + return ret == null ? defaultValue : ret; + } + + public void set(String key, String value) { + params.put(key, value); + } + + public int getInt(String key, int defaultValue) { + String ret = params.get(key); + return ret == null ? defaultValue : Integer.parseInt(ret); + } + + @Override + public String toString() { + Configuration conf = new Configuration(); + conf.set("io.serializations", + "org.apache.hadoop.io.serializer.JavaSerialization," + + "org.apache.hadoop.io.serializer.WritableSerialization"); + DefaultStringifier<Map<String,String>> mapStringifier = new DefaultStringifier<>(conf, + GenericsUtil.getClass(params)); + try { + return mapStringifier.toString(params); + } catch (IOException e) { + log.info("Encountered IOException while deserializing returning empty string", e); + return ""; + } + + } + + public String print() { + return params.toString(); + } + + public static Map<String,String> parseParams(String serializedString) throws IOException { + Configuration conf = new Configuration(); + conf.set("io.serializations", + "org.apache.hadoop.io.serializer.JavaSerialization," + + "org.apache.hadoop.io.serializer.WritableSerialization"); + Map<String,String> params = Maps.newHashMap(); + DefaultStringifier<Map<String,String>> mapStringifier = new DefaultStringifier<>(conf, + GenericsUtil.getClass(params)); + return mapStringifier.fromString(serializedString); + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/common/StringTuple.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/mr/src/main/java/org/apache/mahout/common/StringTuple.java b/community/mahout-mr/mr/src/main/java/org/apache/mahout/common/StringTuple.java new file mode 100644 index 0000000..0de1a4a --- /dev/null +++ b/community/mahout-mr/mr/src/main/java/org/apache/mahout/common/StringTuple.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +import com.google.common.collect.Lists; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; + +/** + * An Ordered List of Strings which can be used in a Hadoop Map/Reduce Job + */ +public final class StringTuple implements WritableComparable<StringTuple> { + + private List<String> tuple = Lists.newArrayList(); + + public StringTuple() { } + + public StringTuple(String firstEntry) { + add(firstEntry); + } + + public StringTuple(Iterable<String> entries) { + for (String entry : entries) { + add(entry); + } + } + + public StringTuple(String[] entries) { + for (String entry : entries) { + add(entry); + } + } + + /** + * add an entry to the end of the list + * + * @param entry + * @return true if the items get added + */ + public boolean add(String entry) { + return tuple.add(entry); + } + + /** + * Fetches the string at the given location + * + * @param index + * @return String value at the given location in the tuple list + */ + public String stringAt(int index) { + return tuple.get(index); + } + + /** + * Replaces the string at the given index with the given newString + * + * @param index + * @param newString + * @return The previous value at that location + */ + public String replaceAt(int index, String newString) { + return tuple.set(index, newString); + } + + /** + * Fetch the list of entries from the tuple + * + * @return a List containing the strings in the order of insertion + */ + public List<String> getEntries() { + return Collections.unmodifiableList(this.tuple); + } + + /** + * Returns the length of the tuple + * + * @return length + */ + public int length() { + return this.tuple.size(); + } + + @Override + public String toString() { + return tuple.toString(); + } + + @Override + public int hashCode() { + return tuple.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + StringTuple other = (StringTuple) obj; + if (tuple == null) { + if (other.tuple != null) { + return false; + } + } else if (!tuple.equals(other.tuple)) { + return false; + } + return true; + } + + @Override + public void readFields(DataInput in) throws IOException { + int len = in.readInt(); + tuple = Lists.newArrayListWithCapacity(len); + Text value = new Text(); + for (int i = 0; i < len; i++) { + value.readFields(in); + tuple.add(value.toString()); + } + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(tuple.size()); + Text value = new Text(); + for (String entry : tuple) { + value.set(entry); + value.write(out); + } + } + + @Override + public int compareTo(StringTuple otherTuple) { + int thisLength = length(); + int otherLength = otherTuple.length(); + int min = Math.min(thisLength, otherLength); + for (int i = 0; i < min; i++) { + int ret = this.tuple.get(i).compareTo(otherTuple.stringAt(i)); + if (ret != 0) { + return ret; + } + } + if (thisLength < otherLength) { + return -1; + } else if (thisLength > otherLength) { + return 1; + } else { + return 0; + } + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/common/StringUtils.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/mr/src/main/java/org/apache/mahout/common/StringUtils.java b/community/mahout-mr/mr/src/main/java/org/apache/mahout/common/StringUtils.java new file mode 100644 index 0000000..a064596 --- /dev/null +++ b/community/mahout-mr/mr/src/main/java/org/apache/mahout/common/StringUtils.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common; + +import java.util.regex.Pattern; + +import com.thoughtworks.xstream.XStream; + +/** + * Offers two methods to convert an object to a string representation and restore the object given its string + * representation. Should use Hadoop Stringifier whenever available. + */ +public final class StringUtils { + + private static final XStream XSTREAM = new XStream(); + private static final Pattern NEWLINE_PATTERN = Pattern.compile("\n"); + private static final Pattern XMLRESERVED = Pattern.compile("\"|\\&|\\<|\\>|\'"); + + private StringUtils() { + // do nothing + } + + /** + * Converts the object to a one-line string representation + * + * @param obj + * the object to convert + * @return the string representation of the object + */ + public static String toString(Object obj) { + return NEWLINE_PATTERN.matcher(XSTREAM.toXML(obj)).replaceAll(""); + } + + /** + * Restores the object from its string representation. + * + * @param str + * the string representation of the object + * @return restored object + */ + public static <T> T fromString(String str) { + return (T) XSTREAM.fromXML(str); + } + + public static String escapeXML(CharSequence input) { + return XMLRESERVED.matcher(input).replaceAll("_"); + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/common/TimingStatistics.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/mr/src/main/java/org/apache/mahout/common/TimingStatistics.java b/community/mahout-mr/mr/src/main/java/org/apache/mahout/common/TimingStatistics.java new file mode 100644 index 0000000..5ee2066 --- /dev/null +++ b/community/mahout-mr/mr/src/main/java/org/apache/mahout/common/TimingStatistics.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common; + +import java.io.Serializable; +import java.text.DecimalFormat; + +public final class TimingStatistics implements Serializable { + private static final DecimalFormat DF = new DecimalFormat("#.##"); + private int nCalls; + private long minTime; + private long maxTime; + private long sumTime; + private long leadSumTime; + private double sumSquaredTime; + + + /** Creates a new instance of CallStats */ + public TimingStatistics() { } + + public TimingStatistics(int nCalls, long minTime, long maxTime, long sumTime, double sumSquaredTime) { + this.nCalls = nCalls; + this.minTime = minTime; + this.maxTime = maxTime; + this.sumTime = sumTime; + this.sumSquaredTime = sumSquaredTime; + } + + public synchronized int getNCalls() { + return nCalls; + } + + public synchronized long getMinTime() { + return Math.max(0, minTime); + } + + public synchronized long getMaxTime() { + return maxTime; + } + + public synchronized long getSumTime() { + return sumTime; + } + + public synchronized double getSumSquaredTime() { + return sumSquaredTime; + } + + public synchronized long getMeanTime() { + return nCalls == 0 ? 0 : sumTime / nCalls; + } + + public synchronized long getStdDevTime() { + if (nCalls == 0) { + return 0; + } + double mean = getMeanTime(); + double meanSquared = mean * mean; + double meanOfSquares = sumSquaredTime / nCalls; + double variance = meanOfSquares - meanSquared; + if (variance < 0) { + return 0; // might happen due to rounding error + } + return (long) Math.sqrt(variance); + } + + @Override + public synchronized String toString() { + return '\n' + + "nCalls = " + nCalls + ";\n" + + "sum = " + DF.format(sumTime / 1000000000.0) + "s;\n" + + "min = " + DF.format(minTime / 1000000.0) + "ms;\n" + + "max = " + DF.format(maxTime / 1000000.0) + "ms;\n" + + "mean = " + DF.format(getMeanTime() / 1000.0) + "us;\n" + + "stdDev = " + DF.format(getStdDevTime() / 1000.0) + "us;"; + } + + /** Ignores counting the performance metrics until leadTimeIsFinished The caller should enough time for the JIT + * to warm up. */ + public Call newCall(long leadTimeUsec) { + if (leadSumTime > leadTimeUsec) { + return new Call(); + } else { + return new LeadTimeCall(); + } + } + + /** Ignores counting the performance metrics. The caller should enough time for the JIT to warm up. */ + public final class LeadTimeCall extends Call { + + private LeadTimeCall() { } + + @Override + public void end() { + long elapsed = System.nanoTime() - startTime; + synchronized (TimingStatistics.this) { + leadSumTime += elapsed; + } + } + + @Override + public boolean end(long sumMaxUsec) { + end(); + return false; + } + } + + /** + * A call object that can update performance metrics. + */ + public class Call { + protected final long startTime = System.nanoTime(); + + private Call() { } + + public void end() { + long elapsed = System.nanoTime() - startTime; + synchronized (TimingStatistics.this) { + nCalls++; + if (elapsed < minTime || nCalls == 1) { + minTime = elapsed; + } + if (elapsed > maxTime) { + maxTime = elapsed; + } + sumTime += elapsed; + sumSquaredTime += elapsed * elapsed; + } + } + + /** + * Returns true if the sumTime as reached this limit; + */ + public boolean end(long sumMaxUsec) { + end(); + return sumMaxUsec < sumTime; + } + } +}
