http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/clustering/streaming/tools/IOUtils.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/clustering/streaming/tools/IOUtils.java b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/clustering/streaming/tools/IOUtils.java new file mode 100644 index 0000000..bd1149b --- /dev/null +++ b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/clustering/streaming/tools/IOUtils.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.streaming.tools; + +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; +import org.apache.mahout.clustering.iterator.ClusterWritable; +import org.apache.mahout.clustering.streaming.mapreduce.CentroidWritable; +import org.apache.mahout.math.Centroid; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; + +public class IOUtils { + + private IOUtils() {} + + /** + * Converts CentroidWritable values in a sequence file into Centroids lazily. + * @param dirIterable the source iterable (comes from a SequenceFileDirIterable). + * @return an Iterable<Centroid> with the converted vectors. + */ + public static Iterable<Centroid> getCentroidsFromCentroidWritableIterable( + Iterable<CentroidWritable> dirIterable) { + return Iterables.transform(dirIterable, new Function<CentroidWritable, Centroid>() { + @Override + public Centroid apply(CentroidWritable input) { + Preconditions.checkNotNull(input); + return input.getCentroid().clone(); + } + }); + } + + /** + * Converts CentroidWritable values in a sequence file into Centroids lazily. + * @param dirIterable the source iterable (comes from a SequenceFileDirIterable). + * @return an Iterable<Centroid> with the converted vectors. + */ + public static Iterable<Centroid> getCentroidsFromClusterWritableIterable(Iterable<ClusterWritable> dirIterable) { + return Iterables.transform(dirIterable, new Function<ClusterWritable, Centroid>() { + int numClusters = 0; + @Override + public Centroid apply(ClusterWritable input) { + Preconditions.checkNotNull(input); + return new Centroid(numClusters++, input.getValue().getCenter().clone(), + input.getValue().getTotalObservations()); + } + }); + } + + /** + * Converts VectorWritable values in a sequence file into Vectors lazily. + * @param dirIterable the source iterable (comes from a SequenceFileDirIterable). + * @return an Iterable<Vector> with the converted vectors. + */ + public static Iterable<Vector> getVectorsFromVectorWritableIterable(Iterable<VectorWritable> dirIterable) { + return Iterables.transform(dirIterable, new Function<VectorWritable, Vector>() { + @Override + public Vector apply(VectorWritable input) { + Preconditions.checkNotNull(input); + return input.get().clone(); + } + }); + } +}
http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/canopy/Job.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/canopy/Job.java b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/canopy/Job.java new file mode 100644 index 0000000..083cd8c --- /dev/null +++ b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/canopy/Job.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.syntheticcontrol.canopy; + +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.ToolRunner; +import org.apache.mahout.clustering.canopy.CanopyDriver; +import org.apache.mahout.clustering.conversion.InputDriver; +import org.apache.mahout.common.AbstractJob; +import org.apache.mahout.common.ClassUtils; +import org.apache.mahout.common.HadoopUtil; +import org.apache.mahout.common.commandline.DefaultOptionCreator; +import org.apache.mahout.common.distance.DistanceMeasure; +import org.apache.mahout.common.distance.EuclideanDistanceMeasure; +import org.apache.mahout.utils.clustering.ClusterDumper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Deprecated +public final class Job extends AbstractJob { + + private static final String DIRECTORY_CONTAINING_CONVERTED_INPUT = "data"; + + private Job() { + } + + private static final Logger log = LoggerFactory.getLogger(Job.class); + + public static void main(String[] args) throws Exception { + if (args.length > 0) { + log.info("Running with only user-supplied arguments"); + ToolRunner.run(new Configuration(), new Job(), args); + } else { + log.info("Running with default arguments"); + Path output = new Path("output"); + HadoopUtil.delete(new Configuration(), output); + run(new Path("testdata"), output, new EuclideanDistanceMeasure(), 80, 55); + } + } + + /** + * Run the canopy clustering job on an input dataset using the given distance + * measure, t1 and t2 parameters. All output data will be written to the + * output directory, which will be initially deleted if it exists. The + * clustered points will reside in the path <output>/clustered-points. By + * default, the job expects the a file containing synthetic_control.data as + * obtained from + * http://archive.ics.uci.edu/ml/datasets/Synthetic+Control+Chart+Time+Series + * resides in a directory named "testdata", and writes output to a directory + * named "output". + * + * @param input + * the String denoting the input directory path + * @param output + * the String denoting the output directory path + * @param measure + * the DistanceMeasure to use + * @param t1 + * the canopy T1 threshold + * @param t2 + * the canopy T2 threshold + */ + private static void run(Path input, Path output, DistanceMeasure measure, + double t1, double t2) throws Exception { + Path directoryContainingConvertedInput = new Path(output, + DIRECTORY_CONTAINING_CONVERTED_INPUT); + InputDriver.runJob(input, directoryContainingConvertedInput, + "org.apache.mahout.math.RandomAccessSparseVector"); + CanopyDriver.run(new Configuration(), directoryContainingConvertedInput, + output, measure, t1, t2, true, 0.0, false); + // run ClusterDumper + ClusterDumper clusterDumper = new ClusterDumper(new Path(output, + "clusters-0-final"), new Path(output, "clusteredPoints")); + clusterDumper.printClusters(null); + } + + @Override + public int run(String[] args) throws Exception { + + addInputOption(); + addOutputOption(); + addOption(DefaultOptionCreator.distanceMeasureOption().create()); + addOption(DefaultOptionCreator.t1Option().create()); + addOption(DefaultOptionCreator.t2Option().create()); + addOption(DefaultOptionCreator.overwriteOption().create()); + + Map<String, List<String>> argMap = parseArguments(args); + if (argMap == null) { + return -1; + } + + Path input = getInputPath(); + Path output = getOutputPath(); + if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) { + HadoopUtil.delete(new Configuration(), output); + } + String measureClass = getOption(DefaultOptionCreator.DISTANCE_MEASURE_OPTION); + double t1 = Double.parseDouble(getOption(DefaultOptionCreator.T1_OPTION)); + double t2 = Double.parseDouble(getOption(DefaultOptionCreator.T2_OPTION)); + DistanceMeasure measure = ClassUtils.instantiateAs(measureClass, DistanceMeasure.class); + + run(input, output, measure, t1, t2); + return 0; + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/fuzzykmeans/Job.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/fuzzykmeans/Job.java b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/fuzzykmeans/Job.java new file mode 100644 index 0000000..43beb78 --- /dev/null +++ b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/fuzzykmeans/Job.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.syntheticcontrol.fuzzykmeans; + +import java.util.List; +import java.util.Map; + +import org.apache.commons.cli2.builder.ArgumentBuilder; +import org.apache.commons.cli2.builder.DefaultOptionBuilder; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.ToolRunner; +import org.apache.mahout.clustering.canopy.CanopyDriver; +import org.apache.mahout.clustering.conversion.InputDriver; +import org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver; +import org.apache.mahout.common.AbstractJob; +import org.apache.mahout.common.ClassUtils; +import org.apache.mahout.common.HadoopUtil; +import org.apache.mahout.common.commandline.DefaultOptionCreator; +import org.apache.mahout.common.distance.DistanceMeasure; +import org.apache.mahout.common.distance.EuclideanDistanceMeasure; +import org.apache.mahout.common.distance.SquaredEuclideanDistanceMeasure; +import org.apache.mahout.utils.clustering.ClusterDumper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class Job extends AbstractJob { + + private static final Logger log = LoggerFactory.getLogger(Job.class); + + private static final String DIRECTORY_CONTAINING_CONVERTED_INPUT = "data"; + + private static final String M_OPTION = FuzzyKMeansDriver.M_OPTION; + + private Job() { + } + + public static void main(String[] args) throws Exception { + if (args.length > 0) { + log.info("Running with only user-supplied arguments"); + ToolRunner.run(new Configuration(), new Job(), args); + } else { + log.info("Running with default arguments"); + Path output = new Path("output"); + Configuration conf = new Configuration(); + HadoopUtil.delete(conf, output); + run(conf, new Path("testdata"), output, new EuclideanDistanceMeasure(), 80, 55, 10, 2.0f, 0.5); + } + } + + @Override + public int run(String[] args) throws Exception { + addInputOption(); + addOutputOption(); + addOption(DefaultOptionCreator.distanceMeasureOption().create()); + addOption(DefaultOptionCreator.convergenceOption().create()); + addOption(DefaultOptionCreator.maxIterationsOption().create()); + addOption(DefaultOptionCreator.overwriteOption().create()); + addOption(DefaultOptionCreator.t1Option().create()); + addOption(DefaultOptionCreator.t2Option().create()); + addOption(M_OPTION, M_OPTION, "coefficient normalization factor, must be greater than 1", true); + + Map<String,List<String>> argMap = parseArguments(args); + if (argMap == null) { + return -1; + } + + Path input = getInputPath(); + Path output = getOutputPath(); + String measureClass = getOption(DefaultOptionCreator.DISTANCE_MEASURE_OPTION); + if (measureClass == null) { + measureClass = SquaredEuclideanDistanceMeasure.class.getName(); + } + double convergenceDelta = Double.parseDouble(getOption(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION)); + int maxIterations = Integer.parseInt(getOption(DefaultOptionCreator.MAX_ITERATIONS_OPTION)); + float fuzziness = Float.parseFloat(getOption(M_OPTION)); + + addOption(new DefaultOptionBuilder().withLongName(M_OPTION).withRequired(true) + .withArgument(new ArgumentBuilder().withName(M_OPTION).withMinimum(1).withMaximum(1).create()) + .withDescription("coefficient normalization factor, must be greater than 1").withShortName(M_OPTION).create()); + if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) { + HadoopUtil.delete(getConf(), output); + } + DistanceMeasure measure = ClassUtils.instantiateAs(measureClass, DistanceMeasure.class); + double t1 = Double.parseDouble(getOption(DefaultOptionCreator.T1_OPTION)); + double t2 = Double.parseDouble(getOption(DefaultOptionCreator.T2_OPTION)); + run(getConf(), input, output, measure, t1, t2, maxIterations, fuzziness, convergenceDelta); + return 0; + } + + /** + * Run the kmeans clustering job on an input dataset using the given distance measure, t1, t2 and iteration + * parameters. All output data will be written to the output directory, which will be initially deleted if it exists. + * The clustered points will reside in the path <output>/clustered-points. By default, the job expects the a file + * containing synthetic_control.data as obtained from + * http://archive.ics.uci.edu/ml/datasets/Synthetic+Control+Chart+Time+Series resides in a directory named "testdata", + * and writes output to a directory named "output". + * + * @param input + * the String denoting the input directory path + * @param output + * the String denoting the output directory path + * @param t1 + * the canopy T1 threshold + * @param t2 + * the canopy T2 threshold + * @param maxIterations + * the int maximum number of iterations + * @param fuzziness + * the float "m" fuzziness coefficient + * @param convergenceDelta + * the double convergence criteria for iterations + */ + public static void run(Configuration conf, Path input, Path output, DistanceMeasure measure, double t1, double t2, + int maxIterations, float fuzziness, double convergenceDelta) throws Exception { + Path directoryContainingConvertedInput = new Path(output, DIRECTORY_CONTAINING_CONVERTED_INPUT); + log.info("Preparing Input"); + InputDriver.runJob(input, directoryContainingConvertedInput, "org.apache.mahout.math.RandomAccessSparseVector"); + log.info("Running Canopy to get initial clusters"); + Path canopyOutput = new Path(output, "canopies"); + CanopyDriver.run(new Configuration(), directoryContainingConvertedInput, canopyOutput, measure, t1, t2, false, 0.0, false); + log.info("Running FuzzyKMeans"); + FuzzyKMeansDriver.run(directoryContainingConvertedInput, new Path(canopyOutput, "clusters-0-final"), output, + convergenceDelta, maxIterations, fuzziness, true, true, 0.0, false); + // run ClusterDumper + ClusterDumper clusterDumper = new ClusterDumper(new Path(output, "clusters-*-final"), new Path(output, "clusteredPoints")); + clusterDumper.printClusters(null); + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/kmeans/Job.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/kmeans/Job.java b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/kmeans/Job.java new file mode 100644 index 0000000..70c41fe --- /dev/null +++ b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/clustering/syntheticcontrol/kmeans/Job.java @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.syntheticcontrol.kmeans; + +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.ToolRunner; +import org.apache.mahout.clustering.Cluster; +import org.apache.mahout.clustering.canopy.CanopyDriver; +import org.apache.mahout.clustering.conversion.InputDriver; +import org.apache.mahout.clustering.kmeans.KMeansDriver; +import org.apache.mahout.clustering.kmeans.RandomSeedGenerator; +import org.apache.mahout.common.AbstractJob; +import org.apache.mahout.common.ClassUtils; +import org.apache.mahout.common.HadoopUtil; +import org.apache.mahout.common.commandline.DefaultOptionCreator; +import org.apache.mahout.common.distance.DistanceMeasure; +import org.apache.mahout.common.distance.EuclideanDistanceMeasure; +import org.apache.mahout.common.distance.SquaredEuclideanDistanceMeasure; +import org.apache.mahout.utils.clustering.ClusterDumper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class Job extends AbstractJob { + + private static final Logger log = LoggerFactory.getLogger(Job.class); + + private static final String DIRECTORY_CONTAINING_CONVERTED_INPUT = "data"; + + private Job() { + } + + public static void main(String[] args) throws Exception { + if (args.length > 0) { + log.info("Running with only user-supplied arguments"); + ToolRunner.run(new Configuration(), new Job(), args); + } else { + log.info("Running with default arguments"); + Path output = new Path("output"); + Configuration conf = new Configuration(); + HadoopUtil.delete(conf, output); + run(conf, new Path("testdata"), output, new EuclideanDistanceMeasure(), 6, 0.5, 10); + } + } + + @Override + public int run(String[] args) throws Exception { + addInputOption(); + addOutputOption(); + addOption(DefaultOptionCreator.distanceMeasureOption().create()); + addOption(DefaultOptionCreator.numClustersOption().create()); + addOption(DefaultOptionCreator.t1Option().create()); + addOption(DefaultOptionCreator.t2Option().create()); + addOption(DefaultOptionCreator.convergenceOption().create()); + addOption(DefaultOptionCreator.maxIterationsOption().create()); + addOption(DefaultOptionCreator.overwriteOption().create()); + + Map<String,List<String>> argMap = parseArguments(args); + if (argMap == null) { + return -1; + } + + Path input = getInputPath(); + Path output = getOutputPath(); + String measureClass = getOption(DefaultOptionCreator.DISTANCE_MEASURE_OPTION); + if (measureClass == null) { + measureClass = SquaredEuclideanDistanceMeasure.class.getName(); + } + double convergenceDelta = Double.parseDouble(getOption(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION)); + int maxIterations = Integer.parseInt(getOption(DefaultOptionCreator.MAX_ITERATIONS_OPTION)); + if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) { + HadoopUtil.delete(getConf(), output); + } + DistanceMeasure measure = ClassUtils.instantiateAs(measureClass, DistanceMeasure.class); + if (hasOption(DefaultOptionCreator.NUM_CLUSTERS_OPTION)) { + int k = Integer.parseInt(getOption(DefaultOptionCreator.NUM_CLUSTERS_OPTION)); + run(getConf(), input, output, measure, k, convergenceDelta, maxIterations); + } else { + double t1 = Double.parseDouble(getOption(DefaultOptionCreator.T1_OPTION)); + double t2 = Double.parseDouble(getOption(DefaultOptionCreator.T2_OPTION)); + run(getConf(), input, output, measure, t1, t2, convergenceDelta, maxIterations); + } + return 0; + } + + /** + * Run the kmeans clustering job on an input dataset using the given the number of clusters k and iteration + * parameters. All output data will be written to the output directory, which will be initially deleted if it exists. + * The clustered points will reside in the path <output>/clustered-points. By default, the job expects a file + * containing equal length space delimited data that resides in a directory named "testdata", and writes output to a + * directory named "output". + * + * @param conf + * the Configuration to use + * @param input + * the String denoting the input directory path + * @param output + * the String denoting the output directory path + * @param measure + * the DistanceMeasure to use + * @param k + * the number of clusters in Kmeans + * @param convergenceDelta + * the double convergence criteria for iterations + * @param maxIterations + * the int maximum number of iterations + */ + public static void run(Configuration conf, Path input, Path output, DistanceMeasure measure, int k, + double convergenceDelta, int maxIterations) throws Exception { + Path directoryContainingConvertedInput = new Path(output, DIRECTORY_CONTAINING_CONVERTED_INPUT); + log.info("Preparing Input"); + InputDriver.runJob(input, directoryContainingConvertedInput, "org.apache.mahout.math.RandomAccessSparseVector"); + log.info("Running random seed to get initial clusters"); + Path clusters = new Path(output, "random-seeds"); + clusters = RandomSeedGenerator.buildRandom(conf, directoryContainingConvertedInput, clusters, k, measure); + log.info("Running KMeans with k = {}", k); + KMeansDriver.run(conf, directoryContainingConvertedInput, clusters, output, convergenceDelta, + maxIterations, true, 0.0, false); + // run ClusterDumper + Path outGlob = new Path(output, "clusters-*-final"); + Path clusteredPoints = new Path(output,"clusteredPoints"); + log.info("Dumping out clusters from clusters: {} and clusteredPoints: {}", outGlob, clusteredPoints); + ClusterDumper clusterDumper = new ClusterDumper(outGlob, clusteredPoints); + clusterDumper.printClusters(null); + } + + /** + * Run the kmeans clustering job on an input dataset using the given distance measure, t1, t2 and iteration + * parameters. All output data will be written to the output directory, which will be initially deleted if it exists. + * The clustered points will reside in the path <output>/clustered-points. By default, the job expects the a file + * containing synthetic_control.data as obtained from + * http://archive.ics.uci.edu/ml/datasets/Synthetic+Control+Chart+Time+Series resides in a directory named "testdata", + * and writes output to a directory named "output". + * + * @param conf + * the Configuration to use + * @param input + * the String denoting the input directory path + * @param output + * the String denoting the output directory path + * @param measure + * the DistanceMeasure to use + * @param t1 + * the canopy T1 threshold + * @param t2 + * the canopy T2 threshold + * @param convergenceDelta + * the double convergence criteria for iterations + * @param maxIterations + * the int maximum number of iterations + */ + public static void run(Configuration conf, Path input, Path output, DistanceMeasure measure, double t1, double t2, + double convergenceDelta, int maxIterations) throws Exception { + Path directoryContainingConvertedInput = new Path(output, DIRECTORY_CONTAINING_CONVERTED_INPUT); + log.info("Preparing Input"); + InputDriver.runJob(input, directoryContainingConvertedInput, "org.apache.mahout.math.RandomAccessSparseVector"); + log.info("Running Canopy to get initial clusters"); + Path canopyOutput = new Path(output, "canopies"); + CanopyDriver.run(new Configuration(), directoryContainingConvertedInput, canopyOutput, measure, t1, t2, false, 0.0, + false); + log.info("Running KMeans"); + KMeansDriver.run(conf, directoryContainingConvertedInput, new Path(canopyOutput, Cluster.INITIAL_CLUSTERS_DIR + + "-final"), output, convergenceDelta, maxIterations, true, 0.0, false); + // run ClusterDumper + ClusterDumper clusterDumper = new ClusterDumper(new Path(output, "clusters-*-final"), new Path(output, + "clusteredPoints")); + clusterDumper.printClusters(null); + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/DeliciousTagsExample.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/DeliciousTagsExample.java b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/DeliciousTagsExample.java new file mode 100644 index 0000000..92363e5 --- /dev/null +++ b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/DeliciousTagsExample.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.fpm.pfpgrowth; + +import java.io.IOException; + +import org.apache.commons.cli2.CommandLine; +import org.apache.commons.cli2.Group; +import org.apache.commons.cli2.Option; +import org.apache.commons.cli2.OptionException; +import org.apache.commons.cli2.builder.ArgumentBuilder; +import org.apache.commons.cli2.builder.DefaultOptionBuilder; +import org.apache.commons.cli2.builder.GroupBuilder; +import org.apache.commons.cli2.commandline.Parser; +import org.apache.mahout.common.CommandLineUtil; +import org.apache.mahout.common.Parameters; +import org.apache.mahout.common.commandline.DefaultOptionCreator; +import org.apache.mahout.fpm.pfpgrowth.dataset.KeyBasedStringTupleGrouper; + +public final class DeliciousTagsExample { + private DeliciousTagsExample() { } + + public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { + DefaultOptionBuilder obuilder = new DefaultOptionBuilder(); + ArgumentBuilder abuilder = new ArgumentBuilder(); + GroupBuilder gbuilder = new GroupBuilder(); + Option inputDirOpt = DefaultOptionCreator.inputOption().create(); + + Option outputOpt = DefaultOptionCreator.outputOption().create(); + + Option helpOpt = DefaultOptionCreator.helpOption(); + Option recordSplitterOpt = obuilder.withLongName("splitterPattern").withArgument( + abuilder.withName("splitterPattern").withMinimum(1).withMaximum(1).create()).withDescription( + "Regular Expression pattern used to split given line into fields." + + " Default value splits comma or tab separated fields." + + " Default Value: \"[ ,\\t]*\\t[ ,\\t]*\" ").withShortName("regex").create(); + Option encodingOpt = obuilder.withLongName("encoding").withArgument( + abuilder.withName("encoding").withMinimum(1).withMaximum(1).create()).withDescription( + "(Optional) The file encoding. Default value: UTF-8").withShortName("e").create(); + Group group = gbuilder.withName("Options").withOption(inputDirOpt).withOption(outputOpt).withOption( + helpOpt).withOption(recordSplitterOpt).withOption(encodingOpt).create(); + + try { + Parser parser = new Parser(); + parser.setGroup(group); + CommandLine cmdLine = parser.parse(args); + + if (cmdLine.hasOption(helpOpt)) { + CommandLineUtil.printHelp(group); + return; + } + Parameters params = new Parameters(); + if (cmdLine.hasOption(recordSplitterOpt)) { + params.set("splitPattern", (String) cmdLine.getValue(recordSplitterOpt)); + } + + String encoding = "UTF-8"; + if (cmdLine.hasOption(encodingOpt)) { + encoding = (String) cmdLine.getValue(encodingOpt); + } + params.set("encoding", encoding); + String inputDir = (String) cmdLine.getValue(inputDirOpt); + String outputDir = (String) cmdLine.getValue(outputOpt); + params.set("input", inputDir); + params.set("output", outputDir); + params.set("groupingFieldCount", "2"); + params.set("gfield0", "1"); + params.set("gfield1", "2"); + params.set("selectedFieldCount", "1"); + params.set("field0", "3"); + params.set("maxTransactionLength", "100"); + KeyBasedStringTupleGrouper.startJob(params); + + } catch (OptionException ex) { + CommandLineUtil.printHelp(group); + } + + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/dataset/KeyBasedStringTupleCombiner.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/dataset/KeyBasedStringTupleCombiner.java b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/dataset/KeyBasedStringTupleCombiner.java new file mode 100644 index 0000000..4c80a31 --- /dev/null +++ b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/dataset/KeyBasedStringTupleCombiner.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.fpm.pfpgrowth.dataset; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.mahout.common.StringTuple; + +public class KeyBasedStringTupleCombiner extends Reducer<Text,StringTuple,Text,StringTuple> { + + @Override + protected void reduce(Text key, + Iterable<StringTuple> values, + Context context) throws IOException, InterruptedException { + Set<String> outputValues = new HashSet<>(); + for (StringTuple value : values) { + outputValues.addAll(value.getEntries()); + } + context.write(key, new StringTuple(outputValues)); + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/dataset/KeyBasedStringTupleGrouper.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/dataset/KeyBasedStringTupleGrouper.java b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/dataset/KeyBasedStringTupleGrouper.java new file mode 100644 index 0000000..cd17770 --- /dev/null +++ b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/dataset/KeyBasedStringTupleGrouper.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.fpm.pfpgrowth.dataset; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.mahout.common.HadoopUtil; +import org.apache.mahout.common.Parameters; +import org.apache.mahout.common.StringTuple; + +public final class KeyBasedStringTupleGrouper { + + private KeyBasedStringTupleGrouper() { } + + public static void startJob(Parameters params) throws IOException, + InterruptedException, + ClassNotFoundException { + Configuration conf = new Configuration(); + + conf.set("job.parameters", params.toString()); + conf.set("mapred.compress.map.output", "true"); + conf.set("mapred.output.compression.type", "BLOCK"); + conf.set("mapred.map.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec"); + conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization," + + "org.apache.hadoop.io.serializer.WritableSerialization"); + + String input = params.get("input"); + Job job = new Job(conf, "Generating dataset based from input" + input); + job.setJarByClass(KeyBasedStringTupleGrouper.class); + + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(StringTuple.class); + + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Text.class); + + FileInputFormat.addInputPath(job, new Path(input)); + Path outPath = new Path(params.get("output")); + FileOutputFormat.setOutputPath(job, outPath); + + HadoopUtil.delete(conf, outPath); + + job.setInputFormatClass(TextInputFormat.class); + job.setMapperClass(KeyBasedStringTupleMapper.class); + job.setCombinerClass(KeyBasedStringTupleCombiner.class); + job.setReducerClass(KeyBasedStringTupleReducer.class); + job.setOutputFormatClass(TextOutputFormat.class); + + boolean succeeded = job.waitForCompletion(true); + if (!succeeded) { + throw new IllegalStateException("Job failed!"); + } + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/dataset/KeyBasedStringTupleMapper.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/dataset/KeyBasedStringTupleMapper.java b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/dataset/KeyBasedStringTupleMapper.java new file mode 100644 index 0000000..362d1ce --- /dev/null +++ b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/dataset/KeyBasedStringTupleMapper.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.fpm.pfpgrowth.dataset; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.regex.Pattern; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.mahout.common.Parameters; +import org.apache.mahout.common.StringTuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Splits the line using a {@link Pattern} and outputs key as given by the groupingFields + * + */ +public class KeyBasedStringTupleMapper extends Mapper<LongWritable,Text,Text,StringTuple> { + + private static final Logger log = LoggerFactory.getLogger(KeyBasedStringTupleMapper.class); + + private Pattern splitter; + + private int[] selectedFields; + + private int[] groupingFields; + + @Override + protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { + String[] fields = splitter.split(value.toString()); + if (fields.length != 4) { + log.info("{} {}", fields.length, value.toString()); + context.getCounter("Map", "ERROR").increment(1); + return; + } + Collection<String> oKey = new ArrayList<>(); + for (int groupingField : groupingFields) { + oKey.add(fields[groupingField]); + context.setStatus(fields[groupingField]); + } + + List<String> oValue = new ArrayList<>(); + for (int selectedField : selectedFields) { + oValue.add(fields[selectedField]); + } + + context.write(new Text(oKey.toString()), new StringTuple(oValue)); + + } + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + super.setup(context); + Parameters params = new Parameters(context.getConfiguration().get("job.parameters", "")); + splitter = Pattern.compile(params.get("splitPattern", "[ \t]*\t[ \t]*")); + + int selectedFieldCount = Integer.valueOf(params.get("selectedFieldCount", "0")); + selectedFields = new int[selectedFieldCount]; + for (int i = 0; i < selectedFieldCount; i++) { + selectedFields[i] = Integer.valueOf(params.get("field" + i, "0")); + } + + int groupingFieldCount = Integer.valueOf(params.get("groupingFieldCount", "0")); + groupingFields = new int[groupingFieldCount]; + for (int i = 0; i < groupingFieldCount; i++) { + groupingFields[i] = Integer.valueOf(params.get("gfield" + i, "0")); + } + + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/dataset/KeyBasedStringTupleReducer.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/dataset/KeyBasedStringTupleReducer.java b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/dataset/KeyBasedStringTupleReducer.java new file mode 100644 index 0000000..a7ef762 --- /dev/null +++ b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/fpm/pfpgrowth/dataset/KeyBasedStringTupleReducer.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.fpm.pfpgrowth.dataset; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashSet; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.mahout.common.Parameters; +import org.apache.mahout.common.StringTuple; + +public class KeyBasedStringTupleReducer extends Reducer<Text,StringTuple,Text,Text> { + + private int maxTransactionLength = 100; + + @Override + protected void reduce(Text key, Iterable<StringTuple> values, Context context) + throws IOException, InterruptedException { + Collection<String> items = new HashSet<>(); + + for (StringTuple value : values) { + for (String field : value.getEntries()) { + items.add(field); + } + } + if (items.size() > 1) { + int i = 0; + StringBuilder sb = new StringBuilder(); + String sep = ""; + for (String field : items) { + if (i % maxTransactionLength == 0) { + if (i != 0) { + context.write(null, new Text(sb.toString())); + } + sb.replace(0, sb.length(), ""); + sep = ""; + } + + sb.append(sep).append(field); + sep = "\t"; + + i++; + + } + if (sb.length() > 0) { + context.write(null, new Text(sb.toString())); + } + } + } + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + super.setup(context); + Parameters params = new Parameters(context.getConfiguration().get("job.parameters", "")); + maxTransactionLength = Integer.valueOf(params.get("maxTransactionLength", "100")); + } +}
