http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansUtilsMR.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansUtilsMR.java b/mr/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansUtilsMR.java new file mode 100644 index 0000000..4bffb2b --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansUtilsMR.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.streaming.mapreduce; + +import java.io.IOException; + +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; +import com.google.common.io.Closeables; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.mahout.common.ClassUtils; +import org.apache.mahout.common.commandline.DefaultOptionCreator; +import org.apache.mahout.common.distance.DistanceMeasure; +import org.apache.mahout.math.Centroid; +import org.apache.mahout.math.RandomAccessSparseVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; +import org.apache.mahout.math.neighborhood.BruteSearch; +import org.apache.mahout.math.neighborhood.FastProjectionSearch; +import org.apache.mahout.math.neighborhood.LocalitySensitiveHashSearch; +import org.apache.mahout.math.neighborhood.ProjectionSearch; +import org.apache.mahout.math.neighborhood.UpdatableSearcher; + +public final class StreamingKMeansUtilsMR { + + private StreamingKMeansUtilsMR() { + } + + /** + * Instantiates a searcher from a given configuration. + * @param conf the configuration + * @return the instantiated searcher + * @throws RuntimeException if the distance measure class cannot be instantiated + * @throws IllegalStateException if an unknown searcher class was requested + */ + public static UpdatableSearcher searcherFromConfiguration(Configuration conf) { + DistanceMeasure distanceMeasure; + String distanceMeasureClass = conf.get(DefaultOptionCreator.DISTANCE_MEASURE_OPTION); + try { + distanceMeasure = (DistanceMeasure) Class.forName(distanceMeasureClass).getConstructor().newInstance(); + } catch (Exception e) { + throw new RuntimeException("Failed to instantiate distanceMeasure", e); + } + + int numProjections = conf.getInt(StreamingKMeansDriver.NUM_PROJECTIONS_OPTION, 20); + int searchSize = conf.getInt(StreamingKMeansDriver.SEARCH_SIZE_OPTION, 10); + + String searcherClass = conf.get(StreamingKMeansDriver.SEARCHER_CLASS_OPTION); + + if (searcherClass.equals(BruteSearch.class.getName())) { + return ClassUtils.instantiateAs(searcherClass, UpdatableSearcher.class, + new Class[]{DistanceMeasure.class}, new Object[]{distanceMeasure}); + } else if (searcherClass.equals(FastProjectionSearch.class.getName()) + || searcherClass.equals(ProjectionSearch.class.getName())) { + return ClassUtils.instantiateAs(searcherClass, UpdatableSearcher.class, + new Class[]{DistanceMeasure.class, int.class, int.class}, + new Object[]{distanceMeasure, numProjections, searchSize}); + } else if (searcherClass.equals(LocalitySensitiveHashSearch.class.getName())) { + return ClassUtils.instantiateAs(searcherClass, LocalitySensitiveHashSearch.class, + new Class[]{DistanceMeasure.class, int.class}, + new Object[]{distanceMeasure, searchSize}); + } else { + throw new IllegalStateException("Unknown class instantiation requested"); + } + } + + /** + * Returns an Iterable of centroids from an Iterable of VectorWritables by creating a new Centroid containing + * a RandomAccessSparseVector as a delegate for each VectorWritable. + * @param inputIterable VectorWritable Iterable to get Centroids from + * @return the new Centroids + */ + public static Iterable<Centroid> getCentroidsFromVectorWritable(Iterable<VectorWritable> inputIterable) { + return Iterables.transform(inputIterable, new Function<VectorWritable, Centroid>() { + private int numVectors = 0; + @Override + public Centroid apply(VectorWritable input) { + Preconditions.checkNotNull(input); + return new Centroid(numVectors++, new RandomAccessSparseVector(input.get()), 1); + } + }); + } + + /** + * Returns an Iterable of Centroid from an Iterable of Vector by either casting each Vector to Centroid (if the + * instance extends Centroid) or create a new Centroid based on that Vector. + * The implicit expectation is that the input will not have interleaving types of vectors. Otherwise, the numbering + * of new Centroids will become invalid. + * @param input Iterable of Vectors to cast + * @return the new Centroids + */ + public static Iterable<Centroid> castVectorsToCentroids(Iterable<Vector> input) { + return Iterables.transform(input, new Function<Vector, Centroid>() { + private int numVectors = 0; + @Override + public Centroid apply(Vector input) { + Preconditions.checkNotNull(input); + if (input instanceof Centroid) { + return (Centroid) input; + } else { + return new Centroid(numVectors++, input, 1); + } + } + }); + } + + /** + * Writes centroids to a sequence file. + * @param centroids the centroids to write. + * @param path the path of the output file. + * @param conf the configuration for the HDFS to write the file to. + * @throws java.io.IOException + */ + public static void writeCentroidsToSequenceFile(Iterable<Centroid> centroids, Path path, Configuration conf) + throws IOException { + SequenceFile.Writer writer = null; + try { + writer = SequenceFile.createWriter(FileSystem.get(conf), conf, + path, IntWritable.class, CentroidWritable.class); + int i = 0; + for (Centroid centroid : centroids) { + writer.append(new IntWritable(i++), new CentroidWritable(centroid)); + } + } finally { + Closeables.close(writer, true); + } + } + + public static void writeVectorsToSequenceFile(Iterable<? extends Vector> datapoints, Path path, Configuration conf) + throws IOException { + SequenceFile.Writer writer = null; + try { + writer = SequenceFile.createWriter(FileSystem.get(conf), conf, + path, IntWritable.class, VectorWritable.class); + int i = 0; + for (Vector vector : datapoints) { + writer.append(new IntWritable(i++), new VectorWritable(vector)); + } + } finally { + Closeables.close(writer, true); + } + } +}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/streaming/tools/ResplitSequenceFiles.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/clustering/streaming/tools/ResplitSequenceFiles.java b/mr/src/main/java/org/apache/mahout/clustering/streaming/tools/ResplitSequenceFiles.java new file mode 100644 index 0000000..55b7848 --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/clustering/streaming/tools/ResplitSequenceFiles.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.streaming.tools; + +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.util.Iterator; + +import com.google.common.base.Charsets; +import com.google.common.collect.Iterables; +import org.apache.commons.cli2.CommandLine; +import org.apache.commons.cli2.Group; +import org.apache.commons.cli2.Option; +import org.apache.commons.cli2.builder.ArgumentBuilder; +import org.apache.commons.cli2.builder.DefaultOptionBuilder; +import org.apache.commons.cli2.builder.GroupBuilder; +import org.apache.commons.cli2.commandline.Parser; +import org.apache.commons.cli2.util.HelpFormatter; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; +import org.apache.mahout.common.Pair; +import org.apache.mahout.common.iterator.sequencefile.PathType; +import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable; + +public class ResplitSequenceFiles { + + private String inputFile; + private String outputFileBase; + private int numSplits; + + private Configuration conf; + private FileSystem fs; + + private ResplitSequenceFiles() {} + + private void writeSplit(Iterator<Pair<Writable, Writable>> inputIterator, + int numSplit, int numEntriesPerSplit) throws IOException { + SequenceFile.Writer splitWriter = null; + for (int j = 0; j < numEntriesPerSplit; ++j) { + Pair<Writable, Writable> item = inputIterator.next(); + if (splitWriter == null) { + splitWriter = SequenceFile.createWriter(fs, conf, + new Path(outputFileBase + "-" + numSplit), item.getFirst().getClass(), item.getSecond().getClass()); + } + splitWriter.append(item.getFirst(), item.getSecond()); + } + if (splitWriter != null) { + splitWriter.close(); + } + } + + private void run(PrintWriter printWriter) throws IOException { + conf = new Configuration(); + SequenceFileDirIterable<Writable, Writable> inputIterable = new + SequenceFileDirIterable<Writable, Writable>(new Path(inputFile), PathType.LIST, conf); + fs = FileSystem.get(conf); + + int numEntries = Iterables.size(inputIterable); + int numEntriesPerSplit = numEntries / numSplits; + int numEntriesLastSplit = numEntriesPerSplit + numEntries - numEntriesPerSplit * numSplits; + Iterator<Pair<Writable, Writable>> inputIterator = inputIterable.iterator(); + + printWriter.printf("Writing %d splits\n", numSplits); + for (int i = 0; i < numSplits - 1; ++i) { + printWriter.printf("Writing split %d\n", i); + writeSplit(inputIterator, i, numEntriesPerSplit); + } + printWriter.printf("Writing split %d\n", numSplits - 1); + writeSplit(inputIterator, numSplits - 1, numEntriesLastSplit); + } + + private boolean parseArgs(String[] args) { + DefaultOptionBuilder builder = new DefaultOptionBuilder(); + + Option help = builder.withLongName("help").withDescription("print this list").create(); + + ArgumentBuilder argumentBuilder = new ArgumentBuilder(); + Option inputFileOption = builder.withLongName("input") + .withShortName("i") + .withRequired(true) + .withArgument(argumentBuilder.withName("input").withMaximum(1).create()) + .withDescription("what the base folder for sequence files is (they all must have the same key/value type") + .create(); + + Option outputFileOption = builder.withLongName("output") + .withShortName("o") + .withRequired(true) + .withArgument(argumentBuilder.withName("output").withMaximum(1).create()) + .withDescription("the base name of the file split that the files will be split it; the i'th split has the " + + "suffix -i") + .create(); + + Option numSplitsOption = builder.withLongName("numSplits") + .withShortName("ns") + .withRequired(true) + .withArgument(argumentBuilder.withName("numSplits").withMaximum(1).create()) + .withDescription("how many splits to use for the given files") + .create(); + + Group normalArgs = new GroupBuilder() + .withOption(help) + .withOption(inputFileOption) + .withOption(outputFileOption) + .withOption(numSplitsOption) + .create(); + + Parser parser = new Parser(); + parser.setHelpOption(help); + parser.setHelpTrigger("--help"); + parser.setGroup(normalArgs); + parser.setHelpFormatter(new HelpFormatter(" ", "", " ", 130)); + CommandLine cmdLine = parser.parseAndHelp(args); + + if (cmdLine == null) { + return false; + } + + inputFile = (String) cmdLine.getValue(inputFileOption); + outputFileBase = (String) cmdLine.getValue(outputFileOption); + numSplits = Integer.parseInt((String) cmdLine.getValue(numSplitsOption)); + return true; + } + + public static void main(String[] args) throws IOException { + ResplitSequenceFiles runner = new ResplitSequenceFiles(); + if (runner.parseArgs(args)) { + runner.run(new PrintWriter(new OutputStreamWriter(System.out, Charsets.UTF_8), true)); + } + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/topdown/PathDirectory.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/clustering/topdown/PathDirectory.java b/mr/src/main/java/org/apache/mahout/clustering/topdown/PathDirectory.java new file mode 100644 index 0000000..11bc34a --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/clustering/topdown/PathDirectory.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.topdown; + +import java.io.File; + +import org.apache.hadoop.fs.Path; + +/** + * Contains list of all internal paths used in top down clustering. + */ +public final class PathDirectory { + + public static final String TOP_LEVEL_CLUSTER_DIRECTORY = "topLevelCluster"; + public static final String POST_PROCESS_DIRECTORY = "clusterPostProcessed"; + public static final String CLUSTERED_POINTS_DIRECTORY = "clusteredPoints"; + public static final String BOTTOM_LEVEL_CLUSTER_DIRECTORY = "bottomLevelCluster"; + + private PathDirectory() { + } + + /** + * All output of top level clustering is stored in output directory/topLevelCluster. + * + * @param output + * the output path of clustering. + * @return The top level Cluster Directory. + */ + public static Path getTopLevelClusterPath(Path output) { + return new Path(output + File.separator + TOP_LEVEL_CLUSTER_DIRECTORY); + } + + /** + * The output of top level clusters is post processed and kept in this path. + * + * @param outputPathProvidedByUser + * the output path of clustering. + * @return the path where the output of top level cluster post processor is kept. + */ + public static Path getClusterPostProcessorOutputDirectory(Path outputPathProvidedByUser) { + return new Path(outputPathProvidedByUser + File.separator + POST_PROCESS_DIRECTORY); + } + + /** + * The top level clustered points before post processing is generated here. + * + * @param output + * the output path of clustering. + * @return the clustered points directory + */ + public static Path getClusterOutputClusteredPoints(Path output) { + return new Path(output + File.separator + CLUSTERED_POINTS_DIRECTORY + File.separator, "*"); + } + + /** + * Each cluster produced by top level clustering is processed in output/"bottomLevelCluster"/clusterId. + * + * @param output + * @param clusterId + * @return the bottom level clustering path. + */ + public static Path getBottomLevelClusterPath(Path output, String clusterId) { + return new Path(output + File.separator + BOTTOM_LEVEL_CLUSTER_DIRECTORY + File.separator + clusterId); + } + + /** + * Each clusters path name is its clusterId. The vectors reside in separate files inside it. + * + * @param clusterPostProcessorOutput + * the path of cluster post processor output. + * @param clusterId + * the id of the cluster. + * @return the cluster path for cluster id. + */ + public static Path getClusterPathForClusterId(Path clusterPostProcessorOutput, String clusterId) { + return new Path(clusterPostProcessorOutput + File.separator + clusterId); + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterCountReader.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterCountReader.java b/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterCountReader.java new file mode 100644 index 0000000..083b543 --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterCountReader.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.topdown.postprocessor; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Writable; +import org.apache.mahout.clustering.iterator.ClusterWritable; +import org.apache.mahout.common.iterator.sequencefile.PathFilters; +import org.apache.mahout.common.iterator.sequencefile.PathType; +import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterator; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +/** + * Reads the number of clusters produced by the clustering algorithm. + */ +public final class ClusterCountReader { + + private ClusterCountReader() { + } + + /** + * Reads the number of clusters present by reading the clusters-*-final file. + * + * @param clusterOutputPath The output path provided to the clustering algorithm. + * @param conf The hadoop configuration. + * @return the number of final clusters. + */ + public static int getNumberOfClusters(Path clusterOutputPath, Configuration conf) throws IOException { + FileSystem fileSystem = clusterOutputPath.getFileSystem(conf); + FileStatus[] clusterFiles = fileSystem.listStatus(clusterOutputPath, PathFilters.finalPartFilter()); + int numberOfClusters = 0; + Iterator<?> it = new SequenceFileDirValueIterator<Writable>(clusterFiles[0].getPath(), + PathType.LIST, + PathFilters.partFilter(), + null, + true, + conf); + while (it.hasNext()) { + it.next(); + numberOfClusters++; + } + return numberOfClusters; + } + + /** + * Generates a list of all cluster ids by reading the clusters-*-final file. + * + * @param clusterOutputPath The output path provided to the clustering algorithm. + * @param conf The hadoop configuration. + * @return An ArrayList containing the final cluster ids. + */ + public static Map<Integer, Integer> getClusterIDs(Path clusterOutputPath, Configuration conf, boolean keyIsClusterId) + throws IOException { + Map<Integer, Integer> clusterIds = new HashMap<>(); + FileSystem fileSystem = clusterOutputPath.getFileSystem(conf); + FileStatus[] clusterFiles = fileSystem.listStatus(clusterOutputPath, PathFilters.finalPartFilter()); + //System.out.println("LOOK HERE: " + clusterOutputPath); + Iterator<ClusterWritable> it = new SequenceFileDirValueIterator<>(clusterFiles[0].getPath(), + PathType.LIST, + PathFilters.partFilter(), + null, + true, + conf); + int i = 0; + while (it.hasNext()) { + Integer key; + Integer value; + if (keyIsClusterId) { // key is the cluster id, value is i, the index we will use + key = it.next().getValue().getId(); + value = i; + } else { + key = i; + value = it.next().getValue().getId(); + } + clusterIds.put(key, value); + i++; + } + return clusterIds; + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessor.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessor.java b/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessor.java new file mode 100644 index 0000000..44a944d --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessor.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.topdown.postprocessor; + +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.SequenceFile.Writer; +import org.apache.hadoop.io.Writable; +import org.apache.mahout.clustering.classify.WeightedVectorWritable; +import org.apache.mahout.clustering.topdown.PathDirectory; +import org.apache.mahout.common.IOUtils; +import org.apache.mahout.common.Pair; +import org.apache.mahout.common.iterator.sequencefile.PathFilters; +import org.apache.mahout.common.iterator.sequencefile.PathType; +import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable; +import org.apache.mahout.math.VectorWritable; + +import java.io.IOException; +import java.util.Map; + +/** + * This class reads the output of any clustering algorithm, and, creates separate directories for different + * clusters. Each cluster directory's name is its clusterId. Each and every point is written in the cluster + * directory associated with that point. + * <p/> + * This class incorporates a sequential algorithm and is appropriate for use for data which has been clustered + * sequentially. + * <p/> + * The sequential and non sequential version, both are being used from {@link ClusterOutputPostProcessorDriver}. + */ +public final class ClusterOutputPostProcessor { + + private Path clusteredPoints; + private final FileSystem fileSystem; + private final Configuration conf; + private final Path clusterPostProcessorOutput; + private final Map<String, Path> postProcessedClusterDirectories = Maps.newHashMap(); + private long uniqueVectorId = 0L; + private final Map<String, SequenceFile.Writer> writersForClusters; + + public ClusterOutputPostProcessor(Path clusterOutputToBeProcessed, + Path output, + Configuration hadoopConfiguration) throws IOException { + this.clusterPostProcessorOutput = output; + this.clusteredPoints = PathDirectory.getClusterOutputClusteredPoints(clusterOutputToBeProcessed); + this.conf = hadoopConfiguration; + this.writersForClusters = Maps.newHashMap(); + fileSystem = clusteredPoints.getFileSystem(conf); + } + + /** + * This method takes the clustered points output by the clustering algorithms as input and writes them into + * their respective clusters. + */ + public void process() throws IOException { + createPostProcessDirectory(); + for (Pair<?, WeightedVectorWritable> record + : new SequenceFileDirIterable<Writable, WeightedVectorWritable>(clusteredPoints, PathType.GLOB, PathFilters.partFilter(), + null, false, conf)) { + String clusterId = record.getFirst().toString().trim(); + putVectorInRespectiveCluster(clusterId, record.getSecond()); + } + IOUtils.close(writersForClusters.values()); + writersForClusters.clear(); + } + + /** + * Creates the directory to put post processed clusters. + */ + private void createPostProcessDirectory() throws IOException { + if (!fileSystem.exists(clusterPostProcessorOutput) + && !fileSystem.mkdirs(clusterPostProcessorOutput)) { + throw new IOException("Error creating cluster post processor directory"); + } + } + + /** + * Finds out the cluster directory of the vector and writes it into the specified cluster. + */ + private void putVectorInRespectiveCluster(String clusterId, WeightedVectorWritable point) throws IOException { + Writer writer = findWriterForVector(clusterId); + postProcessedClusterDirectories.put(clusterId, + PathDirectory.getClusterPathForClusterId(clusterPostProcessorOutput, clusterId)); + writeVectorToCluster(writer, point); + } + + /** + * Finds out the path in cluster where the point is supposed to be written. + */ + private Writer findWriterForVector(String clusterId) throws IOException { + Path clusterDirectory = PathDirectory.getClusterPathForClusterId(clusterPostProcessorOutput, clusterId); + Writer writer = writersForClusters.get(clusterId); + if (writer == null) { + Path pathToWrite = new Path(clusterDirectory, new Path("part-m-0")); + writer = new Writer(fileSystem, conf, pathToWrite, LongWritable.class, VectorWritable.class); + writersForClusters.put(clusterId, writer); + } + return writer; + } + + /** + * Writes vector to the cluster directory. + */ + private void writeVectorToCluster(Writer writer, WeightedVectorWritable point) throws IOException { + writer.append(new LongWritable(uniqueVectorId++), new VectorWritable(point.getVector())); + writer.sync(); + } + + /** + * @return the set of all post processed cluster paths. + */ + public Map<String, Path> getPostProcessedClusterDirectories() { + return postProcessedClusterDirectories; + } + + public void setClusteredPoints(Path clusteredPoints) { + this.clusteredPoints = clusteredPoints; + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorDriver.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorDriver.java b/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorDriver.java new file mode 100644 index 0000000..82a3071 --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorDriver.java @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.topdown.postprocessor; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.hadoop.util.ToolRunner; +import org.apache.mahout.common.AbstractJob; +import org.apache.mahout.common.HadoopUtil; +import org.apache.mahout.common.commandline.DefaultOptionCreator; +import org.apache.mahout.common.iterator.sequencefile.PathFilters; +import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterator; +import org.apache.mahout.math.VectorWritable; + +import java.io.IOException; + +/** + * Post processes the output of clustering algorithms and groups them into respective clusters. Ideal to be + * used for top down clustering. It can also be used if the clustering output needs to be grouped into their + * respective clusters. + */ +public final class ClusterOutputPostProcessorDriver extends AbstractJob { + + /** + * CLI to run clustering post processor. The input to post processor is the ouput path specified to the + * clustering. + */ + @Override + public int run(String[] args) throws Exception { + addInputOption(); + addOutputOption(); + addOption(DefaultOptionCreator.methodOption().create()); + addOption(DefaultOptionCreator.overwriteOption().create()); + + if (parseArguments(args) == null) { + return -1; + } + Path input = getInputPath(); + Path output = getOutputPath(); + + if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) { + HadoopUtil.delete(getConf(), output); + } + boolean runSequential = getOption(DefaultOptionCreator.METHOD_OPTION).equalsIgnoreCase( + DefaultOptionCreator.SEQUENTIAL_METHOD); + run(input, output, runSequential); + return 0; + + } + + public static void main(String[] args) throws Exception { + ToolRunner.run(new Configuration(), new ClusterOutputPostProcessorDriver(), args); + } + + /** + * Post processes the output of clustering algorithms and groups them into respective clusters. Each + * cluster's vectors are written into a directory named after its clusterId. + * + * @param input The output path provided to the clustering algorithm, whose would be post processed. Hint: The + * path of the directory containing clusters-*-final and clusteredPoints. + * @param output The post processed data would be stored at this path. + * @param runSequential If set to true, post processes it sequentially, else, uses. MapReduce. Hint: If the clustering + * was done sequentially, make it sequential, else vice versa. + */ + public static void run(Path input, Path output, boolean runSequential) throws IOException, + InterruptedException, + ClassNotFoundException { + if (runSequential) { + postProcessSeq(input, output); + } else { + Configuration conf = new Configuration(); + postProcessMR(conf, input, output); + movePartFilesToRespectiveDirectories(conf, output); + } + + } + + /** + * Process Sequentially. Reads the vectors one by one, and puts them into respective directory, named after + * their clusterId. + * + * @param input The output path provided to the clustering algorithm, whose would be post processed. Hint : The + * path of the directory containing clusters-*-final and clusteredPoints. + * @param output The post processed data would be stored at this path. + */ + private static void postProcessSeq(Path input, Path output) throws IOException { + ClusterOutputPostProcessor clusterOutputPostProcessor = new ClusterOutputPostProcessor(input, output, + new Configuration()); + clusterOutputPostProcessor.process(); + } + + /** + * Process as a map reduce job. The numberOfReduceTasks is set to the number of clusters present in the + * output. So that each cluster's vector is written in its own part file. + * + * @param conf The hadoop configuration. + * @param input The output path provided to the clustering algorithm, whose would be post processed. Hint : The + * path of the directory containing clusters-*-final and clusteredPoints. + * @param output The post processed data would be stored at this path. + */ + private static void postProcessMR(Configuration conf, Path input, Path output) throws IOException, + InterruptedException, + ClassNotFoundException { + System.out.println("WARNING: If you are running in Hadoop local mode, please use the --sequential option, " + + "as the MapReduce option will not work properly"); + int numberOfClusters = ClusterCountReader.getNumberOfClusters(input, conf); + conf.set("clusterOutputPath", input.toString()); + Job job = new Job(conf, "ClusterOutputPostProcessor Driver running over input: " + input); + job.setInputFormatClass(SequenceFileInputFormat.class); + job.setOutputFormatClass(SequenceFileOutputFormat.class); + job.setMapperClass(ClusterOutputPostProcessorMapper.class); + job.setMapOutputKeyClass(IntWritable.class); + job.setMapOutputValueClass(VectorWritable.class); + job.setReducerClass(ClusterOutputPostProcessorReducer.class); + job.setOutputKeyClass(IntWritable.class); + job.setOutputValueClass(VectorWritable.class); + job.setNumReduceTasks(numberOfClusters); + job.setJarByClass(ClusterOutputPostProcessorDriver.class); + + FileInputFormat.addInputPath(job, new Path(input, new Path("clusteredPoints"))); + FileOutputFormat.setOutputPath(job, output); + if (!job.waitForCompletion(true)) { + throw new InterruptedException("ClusterOutputPostProcessor Job failed processing " + input); + } + } + + /** + * The mapreduce version of the post processor writes different clusters into different part files. This + * method reads the part files and moves them into directories named after their clusterIds. + * + * @param conf The hadoop configuration. + * @param output The post processed data would be stored at this path. + */ + private static void movePartFilesToRespectiveDirectories(Configuration conf, Path output) throws IOException { + FileSystem fileSystem = output.getFileSystem(conf); + for (FileStatus fileStatus : fileSystem.listStatus(output, PathFilters.partFilter())) { + SequenceFileIterator<Writable, Writable> it = + new SequenceFileIterator<>(fileStatus.getPath(), true, conf); + if (it.hasNext()) { + renameFile(it.next().getFirst(), fileStatus, conf); + } + it.close(); + } + } + + /** + * Using @FileSystem rename method to move the file. + */ + private static void renameFile(Writable key, FileStatus fileStatus, Configuration conf) throws IOException { + Path path = fileStatus.getPath(); + FileSystem fileSystem = path.getFileSystem(conf); + Path subDir = new Path(key.toString()); + Path renameTo = new Path(path.getParent(), subDir); + fileSystem.mkdirs(renameTo); + fileSystem.rename(path, renameTo); + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorMapper.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorMapper.java b/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorMapper.java new file mode 100644 index 0000000..6834362 --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorMapper.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.topdown.postprocessor; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.mahout.clustering.classify.WeightedVectorWritable; +import org.apache.mahout.math.VectorWritable; + +import java.io.IOException; +import java.util.Map; + +/** + * Mapper for post processing cluster output. + */ +public class ClusterOutputPostProcessorMapper extends + Mapper<IntWritable, WeightedVectorWritable, IntWritable, VectorWritable> { + + private Map<Integer, Integer> newClusterMappings; + private VectorWritable outputVector; + + //read the current cluster ids, and populate the cluster mapping hash table + @Override + public void setup(Context context) throws IOException { + Configuration conf = context.getConfiguration(); + //this give the clusters-x-final directory where the cluster ids can be read + Path clusterOutputPath = new Path(conf.get("clusterOutputPath")); + //we want the key to be the cluster id, the value to be the index + newClusterMappings = ClusterCountReader.getClusterIDs(clusterOutputPath, conf, true); + outputVector = new VectorWritable(); + } + + @Override + public void map(IntWritable key, WeightedVectorWritable val, Context context) + throws IOException, InterruptedException { + // by pivoting on the cluster mapping value, we can make sure that each unique cluster goes to it's own reducer, + // since they are numbered from 0 to k-1, where k is the number of clusters + outputVector.set(val.getVector()); + context.write(new IntWritable(newClusterMappings.get(key.get())), outputVector); + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorReducer.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorReducer.java b/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorReducer.java new file mode 100644 index 0000000..58dada4 --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterOutputPostProcessorReducer.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.topdown.postprocessor; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.mahout.math.VectorWritable; + +import java.io.IOException; +import java.util.Map; + +/** + * Reducer for post processing cluster output. + */ +public class ClusterOutputPostProcessorReducer + extends Reducer<IntWritable, VectorWritable, IntWritable, VectorWritable> { + + private Map<Integer, Integer> reverseClusterMappings; + + //read the current cluster ids, and populate the hash cluster mapping hash table + @Override + public void setup(Context context) throws IOException { + Configuration conf = context.getConfiguration(); + Path clusterOutputPath = new Path(conf.get("clusterOutputPath")); + //we want to the key to be the index, the value to be the cluster id + reverseClusterMappings = ClusterCountReader.getClusterIDs(clusterOutputPath, conf, false); + } + + /** + * The key is the remapped cluster id and the values contains the vectors in that cluster. + */ + @Override + protected void reduce(IntWritable key, Iterable<VectorWritable> values, Context context) throws IOException, + InterruptedException { + //remap the cluster back to its original id + //and then output the vectors with their correct + //cluster id. + IntWritable outKey = new IntWritable(reverseClusterMappings.get(key.get())); + System.out.println(outKey + " this: " + this); + for (VectorWritable value : values) { + context.write(outKey, value); + } + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/AbstractJob.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/common/AbstractJob.java b/mr/src/main/java/org/apache/mahout/common/AbstractJob.java new file mode 100644 index 0000000..ec77749 --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/common/AbstractJob.java @@ -0,0 +1,658 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.collect.Lists; +import com.google.common.io.Closeables; +import org.apache.commons.cli2.CommandLine; +import org.apache.commons.cli2.Group; +import org.apache.commons.cli2.Option; +import org.apache.commons.cli2.OptionException; +import org.apache.commons.cli2.builder.ArgumentBuilder; +import org.apache.commons.cli2.builder.DefaultOptionBuilder; +import org.apache.commons.cli2.builder.GroupBuilder; +import org.apache.commons.cli2.commandline.Parser; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.lucene.analysis.Analyzer; +import org.apache.mahout.common.commandline.DefaultOptionCreator; +import org.apache.mahout.common.lucene.AnalyzerUtils; +import org.apache.mahout.math.VectorWritable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import org.apache.lucene.analysis.standard.StandardAnalyzer; + +/** + * <p>Superclass of many Mahout Hadoop "jobs". A job drives configuration and launch of one or + * more maps and reduces in order to accomplish some task.</p> + * + * <p>Command line arguments available to all subclasses are:</p> + * + * <ul> + * <li>--tempDir (path): Specifies a directory where the job may place temp files + * (default "temp")</li> + * <li>--help: Show help message</li> + * </ul> + * + * <p>In addition, note some key command line parameters that are parsed by Hadoop, which jobs + * may need to set:</p> + * + * <ul> + * <li>-Dmapred.job.name=(name): Sets the Hadoop task names. It will be suffixed by + * the mapper and reducer class names</li> + * <li>-Dmapred.output.compress={true,false}: Compress final output (default true)</li> + * <li>-Dmapred.input.dir=(path): input file, or directory containing input files (required)</li> + * <li>-Dmapred.output.dir=(path): path to write output files (required)</li> + * </ul> + * + * <p>Note that because of how Hadoop parses arguments, all "-D" arguments must appear before all other + * arguments.</p> + */ +public abstract class AbstractJob extends Configured implements Tool { + + private static final Logger log = LoggerFactory.getLogger(AbstractJob.class); + + /** option used to specify the input path */ + private Option inputOption; + + /** option used to specify the output path */ + private Option outputOption; + + /** input path, populated by {@link #parseArguments(String[])} */ + protected Path inputPath; + protected File inputFile; //the input represented as a file + + /** output path, populated by {@link #parseArguments(String[])} */ + protected Path outputPath; + protected File outputFile; //the output represented as a file + + /** temp path, populated by {@link #parseArguments(String[])} */ + protected Path tempPath; + + protected Map<String, List<String>> argMap; + + /** internal list of options that have been added */ + private final List<Option> options; + private Group group; + + protected AbstractJob() { + options = Lists.newLinkedList(); + } + + /** Returns the input path established by a call to {@link #parseArguments(String[])}. + * The source of the path may be an input option added using {@link #addInputOption()} + * or it may be the value of the {@code mapred.input.dir} configuration + * property. + */ + protected Path getInputPath() { + return inputPath; + } + + /** Returns the output path established by a call to {@link #parseArguments(String[])}. + * The source of the path may be an output option added using {@link #addOutputOption()} + * or it may be the value of the {@code mapred.input.dir} configuration + * property. + */ + protected Path getOutputPath() { + return outputPath; + } + + protected Path getOutputPath(String path) { + return new Path(outputPath, path); + } + + protected File getInputFile() { + return inputFile; + } + + protected File getOutputFile() { + return outputFile; + } + + + protected Path getTempPath() { + return tempPath; + } + + protected Path getTempPath(String directory) { + return new Path(tempPath, directory); + } + + @Override + public Configuration getConf() { + Configuration result = super.getConf(); + if (result == null) { + return new Configuration(); + } + return result; + } + + /** Add an option with no argument whose presence can be checked for using + * {@code containsKey} method on the map returned by {@link #parseArguments(String[])}; + */ + protected void addFlag(String name, String shortName, String description) { + options.add(buildOption(name, shortName, description, false, false, null)); + } + + /** Add an option to the the set of options this job will parse when + * {@link #parseArguments(String[])} is called. This options has an argument + * with null as its default value. + */ + protected void addOption(String name, String shortName, String description) { + options.add(buildOption(name, shortName, description, true, false, null)); + } + + /** Add an option to the the set of options this job will parse when + * {@link #parseArguments(String[])} is called. + * + * @param required if true the {@link #parseArguments(String[])} will throw + * fail with an error and usage message if this option is not specified + * on the command line. + */ + protected void addOption(String name, String shortName, String description, boolean required) { + options.add(buildOption(name, shortName, description, true, required, null)); + } + + /** Add an option to the the set of options this job will parse when + * {@link #parseArguments(String[])} is called. If this option is not + * specified on the command line the default value will be + * used. + * + * @param defaultValue the default argument value if this argument is not + * found on the command-line. null is allowed. + */ + protected void addOption(String name, String shortName, String description, String defaultValue) { + options.add(buildOption(name, shortName, description, true, false, defaultValue)); + } + + /** Add an arbitrary option to the set of options this job will parse when + * {@link #parseArguments(String[])} is called. If this option has no + * argument, use {@code containsKey} on the map returned by + * {@code parseArguments} to check for its presence. Otherwise, the + * string value of the option will be placed in the map using a key + * equal to this options long name preceded by '--'. + * @return the option added. + */ + protected Option addOption(Option option) { + options.add(option); + return option; + } + + protected Group getGroup() { + return group; + } + + /** Add the default input directory option, '-i' which takes a directory + * name as an argument. When {@link #parseArguments(String[])} is + * called, the inputPath will be set based upon the value for this option. + * If this method is called, the input is required. + */ + protected void addInputOption() { + this.inputOption = addOption(DefaultOptionCreator.inputOption().create()); + } + + /** Add the default output directory option, '-o' which takes a directory + * name as an argument. When {@link #parseArguments(String[])} is + * called, the outputPath will be set based upon the value for this option. + * If this method is called, the output is required. + */ + protected void addOutputOption() { + this.outputOption = addOption(DefaultOptionCreator.outputOption().create()); + } + + /** Build an option with the given parameters. Name and description are + * required. + * + * @param name the long name of the option prefixed with '--' on the command-line + * @param shortName the short name of the option, prefixed with '-' on the command-line + * @param description description of the option displayed in help method + * @param hasArg true if the option has an argument. + * @param required true if the option is required. + * @param defaultValue default argument value, can be null. + * @return the option. + */ + protected static Option buildOption(String name, + String shortName, + String description, + boolean hasArg, + boolean required, + String defaultValue) { + + return buildOption(name, shortName, description, hasArg, 1, 1, required, defaultValue); + } + + protected static Option buildOption(String name, + String shortName, + String description, + boolean hasArg, int min, int max, + boolean required, + String defaultValue) { + + DefaultOptionBuilder optBuilder = new DefaultOptionBuilder().withLongName(name).withDescription(description) + .withRequired(required); + + if (shortName != null) { + optBuilder.withShortName(shortName); + } + + if (hasArg) { + ArgumentBuilder argBuilder = new ArgumentBuilder().withName(name).withMinimum(min).withMaximum(max); + + if (defaultValue != null) { + argBuilder = argBuilder.withDefault(defaultValue); + } + + optBuilder.withArgument(argBuilder.create()); + } + + return optBuilder.create(); + } + + /** + * @param name The name of the option + * @return the {@link org.apache.commons.cli2.Option} with the name, else null + */ + protected Option getCLIOption(String name) { + for (Option option : options) { + if (option.getPreferredName().equals(name)) { + return option; + } + } + return null; + } + + /** Parse the arguments specified based on the options defined using the + * various {@code addOption} methods. If -h is specified or an + * exception is encountered print help and return null. Has the + * side effect of setting inputPath and outputPath + * if {@code addInputOption} or {@code addOutputOption} + * or {@code mapred.input.dir} or {@code mapred.output.dir} + * are present in the Configuration. + * + * @return a {@code Map<String,String>} containing options and their argument values. + * The presence of a flag can be tested using {@code containsKey}, while + * argument values can be retrieved using {@code get(optionName)}. The + * names used for keys are the option name parameter prefixed by '--'. + * + * @see #parseArguments(String[], boolean, boolean) -- passes in false, false for the optional args. + */ + public Map<String, List<String>> parseArguments(String[] args) throws IOException { + return parseArguments(args, false, false); + } + + /** + * + * @param args The args to parse + * @param inputOptional if false, then the input option, if set, need not be present. If true and input is an option + * and there is no input, then throw an error + * @param outputOptional if false, then the output option, if set, need not be present. If true and output is an + * option and there is no output, then throw an error + * @return the args parsed into a map. + */ + public Map<String, List<String>> parseArguments(String[] args, boolean inputOptional, boolean outputOptional) + throws IOException { + Option helpOpt = addOption(DefaultOptionCreator.helpOption()); + addOption("tempDir", null, "Intermediate output directory", "temp"); + addOption("startPhase", null, "First phase to run", "0"); + addOption("endPhase", null, "Last phase to run", String.valueOf(Integer.MAX_VALUE)); + + GroupBuilder gBuilder = new GroupBuilder().withName("Job-Specific Options:"); + + for (Option opt : options) { + gBuilder = gBuilder.withOption(opt); + } + + group = gBuilder.create(); + + CommandLine cmdLine; + try { + Parser parser = new Parser(); + parser.setGroup(group); + parser.setHelpOption(helpOpt); + cmdLine = parser.parse(args); + + } catch (OptionException e) { + log.error(e.getMessage()); + CommandLineUtil.printHelpWithGenericOptions(group, e); + return null; + } + + if (cmdLine.hasOption(helpOpt)) { + CommandLineUtil.printHelpWithGenericOptions(group); + return null; + } + + try { + parseDirectories(cmdLine, inputOptional, outputOptional); + } catch (IllegalArgumentException e) { + log.error(e.getMessage()); + CommandLineUtil.printHelpWithGenericOptions(group); + return null; + } + + argMap = new TreeMap<String, List<String>>(); + maybePut(argMap, cmdLine, this.options.toArray(new Option[this.options.size()])); + + this.tempPath = new Path(getOption("tempDir")); + + if (!hasOption("quiet")) { + log.info("Command line arguments: {}", argMap); + } + return argMap; + } + + /** + * Build the option key (--name) from the option name + */ + public static String keyFor(String optionName) { + return "--" + optionName; + } + + /** + * @return the requested option, or null if it has not been specified + */ + public String getOption(String optionName) { + List<String> list = argMap.get(keyFor(optionName)); + if (list != null && !list.isEmpty()) { + return list.get(0); + } + return null; + } + + /** + * Get the option, else the default + * @param optionName The name of the option to look up, without the -- + * @param defaultVal The default value. + * @return The requested option, else the default value if it doesn't exist + */ + public String getOption(String optionName, String defaultVal) { + String res = getOption(optionName); + if (res == null) { + res = defaultVal; + } + return res; + } + + public int getInt(String optionName) { + return Integer.parseInt(getOption(optionName)); + } + + public int getInt(String optionName, int defaultVal) { + return Integer.parseInt(getOption(optionName, String.valueOf(defaultVal))); + } + + public float getFloat(String optionName) { + return Float.parseFloat(getOption(optionName)); + } + + public float getFloat(String optionName, float defaultVal) { + return Float.parseFloat(getOption(optionName, String.valueOf(defaultVal))); + } + + /** + * Options can occur multiple times, so return the list + * @param optionName The unadorned (no "--" prefixing it) option name + * @return The values, else null. If the option is present, but has no values, then the result will be an + * empty list (Collections.emptyList()) + */ + public List<String> getOptions(String optionName) { + return argMap.get(keyFor(optionName)); + } + + /** + * @return if the requested option has been specified + */ + public boolean hasOption(String optionName) { + return argMap.containsKey(keyFor(optionName)); + } + + + /** + * Get the cardinality of the input vectors + * + * @param matrix + * @return the cardinality of the vector + */ + public int getDimensions(Path matrix) throws IOException { + + SequenceFile.Reader reader = null; + try { + reader = new SequenceFile.Reader(FileSystem.get(getConf()), matrix, getConf()); + + Writable row = ClassUtils.instantiateAs(reader.getKeyClass().asSubclass(Writable.class), Writable.class); + + Preconditions.checkArgument(reader.getValueClass().equals(VectorWritable.class), + "value type of sequencefile must be a VectorWritable"); + + VectorWritable vectorWritable = new VectorWritable(); + boolean hasAtLeastOneRow = reader.next(row, vectorWritable); + Preconditions.checkState(hasAtLeastOneRow, "matrix must have at least one row"); + + return vectorWritable.get().size(); + + } finally { + Closeables.close(reader, true); + } + } + + /** + * Obtain input and output directories from command-line options or hadoop + * properties. If {@code addInputOption} or {@code addOutputOption} + * has been called, this method will throw an {@code OptionException} if + * no source (command-line or property) for that value is present. + * Otherwise, {@code inputPath} or {@code outputPath} will be + * non-null only if specified as a hadoop property. Command-line options + * take precedence over hadoop properties. + * + * @throws IllegalArgumentException if either inputOption is present, + * and neither {@code --input} nor {@code -Dmapred.input dir} are + * specified or outputOption is present and neither {@code --output} + * nor {@code -Dmapred.output.dir} are specified. + */ + protected void parseDirectories(CommandLine cmdLine, boolean inputOptional, boolean outputOptional) { + + Configuration conf = getConf(); + + if (inputOption != null && cmdLine.hasOption(inputOption)) { + this.inputPath = new Path(cmdLine.getValue(inputOption).toString()); + this.inputFile = new File(cmdLine.getValue(inputOption).toString()); + } + if (inputPath == null && conf.get("mapred.input.dir") != null) { + this.inputPath = new Path(conf.get("mapred.input.dir")); + } + + if (outputOption != null && cmdLine.hasOption(outputOption)) { + this.outputPath = new Path(cmdLine.getValue(outputOption).toString()); + this.outputFile = new File(cmdLine.getValue(outputOption).toString()); + } + if (outputPath == null && conf.get("mapred.output.dir") != null) { + this.outputPath = new Path(conf.get("mapred.output.dir")); + } + + Preconditions.checkArgument(inputOptional || inputOption == null || inputPath != null, + "No input specified or -Dmapred.input.dir must be provided to specify input directory"); + Preconditions.checkArgument(outputOptional || outputOption == null || outputPath != null, + "No output specified: or -Dmapred.output.dir must be provided to specify output directory"); + } + + protected static void maybePut(Map<String, List<String>> args, CommandLine cmdLine, Option... opt) { + for (Option o : opt) { + + // the option appeared on the command-line, or it has a value + // (which is likely a default value). + if (cmdLine.hasOption(o) || cmdLine.getValue(o) != null + || (cmdLine.getValues(o) != null && !cmdLine.getValues(o).isEmpty())) { + + // nulls are ok, for cases where options are simple flags. + List<?> vo = cmdLine.getValues(o); + if (vo != null && !vo.isEmpty()) { + List<String> vals = Lists.newArrayList(); + for (Object o1 : vo) { + vals.add(o1.toString()); + } + args.put(o.getPreferredName(), vals); + } else { + args.put(o.getPreferredName(), null); + } + } + } + } + + /** + * + * @param args The input argument map + * @param optName The adorned (including "--") option name + * @return The first value in the match, else null + */ + public static String getOption(Map<String, List<String>> args, String optName) { + List<String> res = args.get(optName); + if (res != null && !res.isEmpty()) { + return res.get(0); + } + return null; + } + + + protected static boolean shouldRunNextPhase(Map<String, List<String>> args, AtomicInteger currentPhase) { + int phase = currentPhase.getAndIncrement(); + String startPhase = getOption(args, "--startPhase"); + String endPhase = getOption(args, "--endPhase"); + boolean phaseSkipped = (startPhase != null && phase < Integer.parseInt(startPhase)) + || (endPhase != null && phase > Integer.parseInt(endPhase)); + if (phaseSkipped) { + log.info("Skipping phase {}", phase); + } + return !phaseSkipped; + } + + protected Job prepareJob(Path inputPath, + Path outputPath, + Class<? extends InputFormat> inputFormat, + Class<? extends Mapper> mapper, + Class<? extends Writable> mapperKey, + Class<? extends Writable> mapperValue, + Class<? extends OutputFormat> outputFormat) throws IOException { + return prepareJob(inputPath, outputPath, inputFormat, mapper, mapperKey, mapperValue, outputFormat, null); + + } + protected Job prepareJob(Path inputPath, + Path outputPath, + Class<? extends InputFormat> inputFormat, + Class<? extends Mapper> mapper, + Class<? extends Writable> mapperKey, + Class<? extends Writable> mapperValue, + Class<? extends OutputFormat> outputFormat, + String jobname) throws IOException { + + Job job = HadoopUtil.prepareJob(inputPath, outputPath, + inputFormat, mapper, mapperKey, mapperValue, outputFormat, getConf()); + + String name = + jobname != null ? jobname : HadoopUtil.getCustomJobName(getClass().getSimpleName(), job, mapper, Reducer.class); + + job.setJobName(name); + return job; + + } + + protected Job prepareJob(Path inputPath, Path outputPath, Class<? extends Mapper> mapper, + Class<? extends Writable> mapperKey, Class<? extends Writable> mapperValue, Class<? extends Reducer> reducer, + Class<? extends Writable> reducerKey, Class<? extends Writable> reducerValue) throws IOException { + return prepareJob(inputPath, outputPath, SequenceFileInputFormat.class, mapper, mapperKey, mapperValue, reducer, + reducerKey, reducerValue, SequenceFileOutputFormat.class); + } + + protected Job prepareJob(Path inputPath, + Path outputPath, + Class<? extends InputFormat> inputFormat, + Class<? extends Mapper> mapper, + Class<? extends Writable> mapperKey, + Class<? extends Writable> mapperValue, + Class<? extends Reducer> reducer, + Class<? extends Writable> reducerKey, + Class<? extends Writable> reducerValue, + Class<? extends OutputFormat> outputFormat) throws IOException { + Job job = HadoopUtil.prepareJob(inputPath, outputPath, + inputFormat, mapper, mapperKey, mapperValue, reducer, reducerKey, reducerValue, outputFormat, getConf()); + job.setJobName(HadoopUtil.getCustomJobName(getClass().getSimpleName(), job, mapper, Reducer.class)); + return job; + } + + /** + * necessary to make this job (having a combined input path) work on Amazon S3, hopefully this is + * obsolete when MultipleInputs is available again + */ + public static void setS3SafeCombinedInputPath(Job job, Path referencePath, Path inputPathOne, Path inputPathTwo) + throws IOException { + FileSystem fs = FileSystem.get(referencePath.toUri(), job.getConfiguration()); + FileInputFormat.setInputPaths(job, inputPathOne.makeQualified(fs), inputPathTwo.makeQualified(fs)); + } + + protected Class<? extends Analyzer> getAnalyzerClassFromOption() throws ClassNotFoundException { + Class<? extends Analyzer> analyzerClass = StandardAnalyzer.class; + if (hasOption(DefaultOptionCreator.ANALYZER_NAME_OPTION)) { + String className = getOption(DefaultOptionCreator.ANALYZER_NAME_OPTION); + analyzerClass = Class.forName(className).asSubclass(Analyzer.class); + // try instantiating it, b/c there isn't any point in setting it if + // you can't instantiate it + //ClassUtils.instantiateAs(analyzerClass, Analyzer.class); + AnalyzerUtils.createAnalyzer(analyzerClass); + } + return analyzerClass; + } + + /** + * Overrides the base implementation to install the Oozie action configuration resource + * into the provided Configuration object; note that ToolRunner calls setConf on the Tool + * before it invokes run. + */ + @Override + public void setConf(Configuration conf) { + super.setConf(conf); + + // If running in an Oozie workflow as a Java action, need to add the + // Configuration resource provided by Oozie to this job's config. + String oozieActionConfXml = System.getProperty("oozie.action.conf.xml"); + if (oozieActionConfXml != null && conf != null) { + conf.addResource(new Path("file:///", oozieActionConfXml)); + log.info("Added Oozie action Configuration resource {} to the Hadoop Configuration", oozieActionConfXml); + } + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/ClassUtils.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/common/ClassUtils.java b/mr/src/main/java/org/apache/mahout/common/ClassUtils.java new file mode 100644 index 0000000..8052ef1 --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/common/ClassUtils.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common; + +import java.lang.reflect.InvocationTargetException; + +public final class ClassUtils { + + private ClassUtils() {} + + public static <T> T instantiateAs(String classname, Class<T> asSubclassOfClass) { + try { + return instantiateAs(Class.forName(classname).asSubclass(asSubclassOfClass), asSubclassOfClass); + } catch (ClassNotFoundException e) { + throw new IllegalStateException(e); + } + } + + public static <T> T instantiateAs(String classname, Class<T> asSubclassOfClass, Class<?>[] params, Object[] args) { + try { + return instantiateAs(Class.forName(classname).asSubclass(asSubclassOfClass), asSubclassOfClass, params, args); + } catch (ClassNotFoundException e) { + throw new IllegalStateException(e); + } + } + + public static <T> T instantiateAs(Class<? extends T> clazz, + Class<T> asSubclassOfClass, + Class<?>[] params, + Object[] args) { + try { + return clazz.asSubclass(asSubclassOfClass).getConstructor(params).newInstance(args); + } catch (InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException ie) { + throw new IllegalStateException(ie); + } + } + + + public static <T> T instantiateAs(Class<? extends T> clazz, Class<T> asSubclassOfClass) { + try { + return clazz.asSubclass(asSubclassOfClass).getConstructor().newInstance(); + } catch (InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException ie) { + throw new IllegalStateException(ie); + } + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/common/CommandLineUtil.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/common/CommandLineUtil.java b/mr/src/main/java/org/apache/mahout/common/CommandLineUtil.java new file mode 100644 index 0000000..0cc93ba --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/common/CommandLineUtil.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.common; + +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; + +import com.google.common.base.Charsets; +import org.apache.commons.cli2.Group; +import org.apache.commons.cli2.OptionException; +import org.apache.commons.cli2.util.HelpFormatter; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.GenericOptionsParser; + +public final class CommandLineUtil { + + private CommandLineUtil() { } + + public static void printHelp(Group group) { + HelpFormatter formatter = new HelpFormatter(); + formatter.setGroup(group); + formatter.print(); + } + + /** + * Print the options supported by {@code GenericOptionsParser}. + * In addition to the options supported by the job, passed in as the + * group parameter. + * + * @param group job-specific command-line options. + */ + public static void printHelpWithGenericOptions(Group group) throws IOException { + new GenericOptionsParser(new Configuration(), new org.apache.commons.cli.Options(), new String[0]); + PrintWriter pw = new PrintWriter(new OutputStreamWriter(System.out, Charsets.UTF_8), true); + HelpFormatter formatter = new HelpFormatter(); + formatter.setGroup(group); + formatter.setPrintWriter(pw); + formatter.setFooter("Specify HDFS directories while running on hadoop; else specify local file system directories"); + formatter.print(); + } + + public static void printHelpWithGenericOptions(Group group, OptionException oe) throws IOException { + new GenericOptionsParser(new Configuration(), new org.apache.commons.cli.Options(), new String[0]); + PrintWriter pw = new PrintWriter(new OutputStreamWriter(System.out, Charsets.UTF_8), true); + HelpFormatter formatter = new HelpFormatter(); + formatter.setGroup(group); + formatter.setPrintWriter(pw); + formatter.setException(oe); + formatter.print(); + } + +}
