http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java ---------------------------------------------------------------------- diff --git a/mr/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java b/mr/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java new file mode 100644 index 0000000..35de87e --- /dev/null +++ b/mr/src/test/java/org/apache/mahout/clustering/canopy/TestCanopyCreation.java @@ -0,0 +1,674 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.canopy; + +import java.util.Collection; +import java.util.List; +import java.util.Set; + +import com.google.common.collect.Iterables; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.util.ToolRunner; +import org.apache.mahout.clustering.ClusteringTestUtils; +import org.apache.mahout.clustering.iterator.ClusterWritable; +import org.apache.mahout.common.DummyRecordWriter; +import org.apache.mahout.common.HadoopUtil; +import org.apache.mahout.common.MahoutTestCase; +import org.apache.mahout.common.Pair; +import org.apache.mahout.common.commandline.DefaultOptionCreator; +import org.apache.mahout.common.distance.DistanceMeasure; +import org.apache.mahout.common.distance.EuclideanDistanceMeasure; +import org.apache.mahout.common.distance.ManhattanDistanceMeasure; +import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterable; +import org.apache.mahout.math.RandomAccessSparseVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.Lists; +import com.google.common.io.Closeables; + +@Deprecated +public final class TestCanopyCreation extends MahoutTestCase { + + private static final double[][] RAW = { { 1, 1 }, { 2, 1 }, { 1, 2 }, + { 2, 2 }, { 3, 3 }, { 4, 4 }, { 5, 4 }, { 4, 5 }, { 5, 5 } }; + + private List<Canopy> referenceManhattan; + + private final DistanceMeasure manhattanDistanceMeasure = new ManhattanDistanceMeasure(); + + private List<Vector> manhattanCentroids; + + private List<Canopy> referenceEuclidean; + + private final DistanceMeasure euclideanDistanceMeasure = new EuclideanDistanceMeasure(); + + private List<Vector> euclideanCentroids; + + private FileSystem fs; + + private static List<VectorWritable> getPointsWritable() { + List<VectorWritable> points = Lists.newArrayList(); + for (double[] fr : RAW) { + Vector vec = new RandomAccessSparseVector(fr.length); + vec.assign(fr); + points.add(new VectorWritable(vec)); + } + return points; + } + + private static List<Vector> getPoints() { + List<Vector> points = Lists.newArrayList(); + for (double[] fr : RAW) { + Vector vec = new RandomAccessSparseVector(fr.length); + vec.assign(fr); + points.add(vec); + } + return points; + } + + /** + * Print the canopies to the transcript + * + * @param canopies + * a List<Canopy> + */ + private static void printCanopies(Iterable<Canopy> canopies) { + for (Canopy canopy : canopies) { + System.out.println(canopy.asFormatString(null)); + } + } + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + fs = FileSystem.get(getConfiguration()); + referenceManhattan = CanopyClusterer.createCanopies(getPoints(), + manhattanDistanceMeasure, 3.1, 2.1); + manhattanCentroids = CanopyClusterer.getCenters(referenceManhattan); + referenceEuclidean = CanopyClusterer.createCanopies(getPoints(), + euclideanDistanceMeasure, 3.1, 2.1); + euclideanCentroids = CanopyClusterer.getCenters(referenceEuclidean); + } + + /** + * Story: User can cluster points using a ManhattanDistanceMeasure and a + * reference implementation + */ + @Test + public void testReferenceManhattan() throws Exception { + // see setUp for cluster creation + printCanopies(referenceManhattan); + assertEquals("number of canopies", 3, referenceManhattan.size()); + for (int canopyIx = 0; canopyIx < referenceManhattan.size(); canopyIx++) { + Canopy testCanopy = referenceManhattan.get(canopyIx); + int[] expectedNumPoints = { 4, 4, 3 }; + double[][] expectedCentroids = { { 1.5, 1.5 }, { 4.0, 4.0 }, + { 4.666666666666667, 4.6666666666666667 } }; + assertEquals("canopy points " + canopyIx, testCanopy.getNumObservations(), + expectedNumPoints[canopyIx]); + double[] refCentroid = expectedCentroids[canopyIx]; + Vector testCentroid = testCanopy.computeCentroid(); + for (int pointIx = 0; pointIx < refCentroid.length; pointIx++) { + assertEquals("canopy centroid " + canopyIx + '[' + pointIx + ']', + refCentroid[pointIx], testCentroid.get(pointIx), EPSILON); + } + } + } + + /** + * Story: User can cluster points using a EuclideanDistanceMeasure and a + * reference implementation + */ + @Test + public void testReferenceEuclidean() throws Exception { + // see setUp for cluster creation + printCanopies(referenceEuclidean); + assertEquals("number of canopies", 3, referenceEuclidean.size()); + int[] expectedNumPoints = { 5, 5, 3 }; + double[][] expectedCentroids = { { 1.8, 1.8 }, { 4.2, 4.2 }, + { 4.666666666666667, 4.666666666666667 } }; + for (int canopyIx = 0; canopyIx < referenceEuclidean.size(); canopyIx++) { + Canopy testCanopy = referenceEuclidean.get(canopyIx); + assertEquals("canopy points " + canopyIx, testCanopy.getNumObservations(), + expectedNumPoints[canopyIx]); + double[] refCentroid = expectedCentroids[canopyIx]; + Vector testCentroid = testCanopy.computeCentroid(); + for (int pointIx = 0; pointIx < refCentroid.length; pointIx++) { + assertEquals("canopy centroid " + canopyIx + '[' + pointIx + ']', + refCentroid[pointIx], testCentroid.get(pointIx), EPSILON); + } + } + } + + /** + * Story: User can produce initial canopy centers using a + * ManhattanDistanceMeasure and a CanopyMapper which clusters input points to + * produce an output set of canopy centroid points. + */ + @Test + public void testCanopyMapperManhattan() throws Exception { + CanopyMapper mapper = new CanopyMapper(); + Configuration conf = getConfiguration(); + conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY, manhattanDistanceMeasure + .getClass().getName()); + conf.set(CanopyConfigKeys.T1_KEY, String.valueOf(3.1)); + conf.set(CanopyConfigKeys.T2_KEY, String.valueOf(2.1)); + conf.set(CanopyConfigKeys.CF_KEY, "0"); + DummyRecordWriter<Text, VectorWritable> writer = new DummyRecordWriter<Text, VectorWritable>(); + Mapper<WritableComparable<?>, VectorWritable, Text, VectorWritable>.Context context = DummyRecordWriter + .build(mapper, conf, writer); + mapper.setup(context); + + List<VectorWritable> points = getPointsWritable(); + // map the data + for (VectorWritable point : points) { + mapper.map(new Text(), point, context); + } + mapper.cleanup(context); + assertEquals("Number of map results", 1, writer.getData().size()); + // now verify the output + List<VectorWritable> data = writer.getValue(new Text("centroid")); + assertEquals("Number of centroids", 3, data.size()); + for (int i = 0; i < data.size(); i++) { + assertEquals("Centroid error", + manhattanCentroids.get(i).asFormatString(), data.get(i).get() + .asFormatString()); + } + } + + /** + * Story: User can produce initial canopy centers using a + * EuclideanDistanceMeasure and a CanopyMapper/Combiner which clusters input + * points to produce an output set of canopy centroid points. + */ + @Test + public void testCanopyMapperEuclidean() throws Exception { + CanopyMapper mapper = new CanopyMapper(); + Configuration conf = getConfiguration(); + conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY, euclideanDistanceMeasure + .getClass().getName()); + conf.set(CanopyConfigKeys.T1_KEY, String.valueOf(3.1)); + conf.set(CanopyConfigKeys.T2_KEY, String.valueOf(2.1)); + conf.set(CanopyConfigKeys.CF_KEY, "0"); + DummyRecordWriter<Text, VectorWritable> writer = new DummyRecordWriter<Text, VectorWritable>(); + Mapper<WritableComparable<?>, VectorWritable, Text, VectorWritable>.Context context = DummyRecordWriter + .build(mapper, conf, writer); + mapper.setup(context); + + List<VectorWritable> points = getPointsWritable(); + // map the data + for (VectorWritable point : points) { + mapper.map(new Text(), point, context); + } + mapper.cleanup(context); + assertEquals("Number of map results", 1, writer.getData().size()); + // now verify the output + List<VectorWritable> data = writer.getValue(new Text("centroid")); + assertEquals("Number of centroids", 3, data.size()); + for (int i = 0; i < data.size(); i++) { + assertEquals("Centroid error", + euclideanCentroids.get(i).asFormatString(), data.get(i).get() + .asFormatString()); + } + } + + /** + * Story: User can produce final canopy centers using a + * ManhattanDistanceMeasure and a CanopyReducer which clusters input centroid + * points to produce an output set of final canopy centroid points. + */ + @Test + public void testCanopyReducerManhattan() throws Exception { + CanopyReducer reducer = new CanopyReducer(); + Configuration conf = getConfiguration(); + conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY, + "org.apache.mahout.common.distance.ManhattanDistanceMeasure"); + conf.set(CanopyConfigKeys.T1_KEY, String.valueOf(3.1)); + conf.set(CanopyConfigKeys.T2_KEY, String.valueOf(2.1)); + conf.set(CanopyConfigKeys.CF_KEY, "0"); + DummyRecordWriter<Text, ClusterWritable> writer = new DummyRecordWriter<Text, ClusterWritable>(); + Reducer<Text, VectorWritable, Text, ClusterWritable>.Context context = DummyRecordWriter + .build(reducer, conf, writer, Text.class, VectorWritable.class); + reducer.setup(context); + + List<VectorWritable> points = getPointsWritable(); + reducer.reduce(new Text("centroid"), points, context); + Iterable<Text> keys = writer.getKeysInInsertionOrder(); + assertEquals("Number of centroids", 3, Iterables.size(keys)); + int i = 0; + for (Text key : keys) { + List<ClusterWritable> data = writer.getValue(key); + ClusterWritable clusterWritable = data.get(0); + Canopy canopy = (Canopy) clusterWritable.getValue(); + assertEquals(manhattanCentroids.get(i).asFormatString() + " is not equal to " + + canopy.computeCentroid().asFormatString(), + manhattanCentroids.get(i), canopy.computeCentroid()); + i++; + } + } + + /** + * Story: User can produce final canopy centers using a + * EuclideanDistanceMeasure and a CanopyReducer which clusters input centroid + * points to produce an output set of final canopy centroid points. + */ + @Test + public void testCanopyReducerEuclidean() throws Exception { + CanopyReducer reducer = new CanopyReducer(); + Configuration conf = getConfiguration(); + conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY, "org.apache.mahout.common.distance.EuclideanDistanceMeasure"); + conf.set(CanopyConfigKeys.T1_KEY, String.valueOf(3.1)); + conf.set(CanopyConfigKeys.T2_KEY, String.valueOf(2.1)); + conf.set(CanopyConfigKeys.CF_KEY, "0"); + DummyRecordWriter<Text, ClusterWritable> writer = new DummyRecordWriter<Text, ClusterWritable>(); + Reducer<Text, VectorWritable, Text, ClusterWritable>.Context context = + DummyRecordWriter.build(reducer, conf, writer, Text.class, VectorWritable.class); + reducer.setup(context); + + List<VectorWritable> points = getPointsWritable(); + reducer.reduce(new Text("centroid"), points, context); + Iterable<Text> keys = writer.getKeysInInsertionOrder(); + assertEquals("Number of centroids", 3, Iterables.size(keys)); + int i = 0; + for (Text key : keys) { + List<ClusterWritable> data = writer.getValue(key); + ClusterWritable clusterWritable = data.get(0); + Canopy canopy = (Canopy) clusterWritable.getValue(); + assertEquals(euclideanCentroids.get(i).asFormatString() + " is not equal to " + + canopy.computeCentroid().asFormatString(), + euclideanCentroids.get(i), canopy.computeCentroid()); + i++; + } + } + + /** + * Story: User can produce final canopy centers using a Hadoop map/reduce job + * and a ManhattanDistanceMeasure. + */ + @Test + public void testCanopyGenManhattanMR() throws Exception { + List<VectorWritable> points = getPointsWritable(); + Configuration config = getConfiguration(); + ClusteringTestUtils.writePointsToFile(points, + getTestTempFilePath("testdata/file1"), fs, config); + ClusteringTestUtils.writePointsToFile(points, + getTestTempFilePath("testdata/file2"), fs, config); + // now run the Canopy Driver + Path output = getTestTempDirPath("output"); + CanopyDriver.run(config, getTestTempDirPath("testdata"), output, + manhattanDistanceMeasure, 3.1, 2.1, false, 0.0, false); + + // verify output from sequence file + Path path = new Path(output, "clusters-0-final/part-r-00000"); + FileSystem fs = FileSystem.get(path.toUri(), config); + SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, config); + try { + Writable key = new Text(); + ClusterWritable clusterWritable = new ClusterWritable(); + assertTrue("more to come", reader.next(key, clusterWritable)); + assertEquals("1st key", "C-0", key.toString()); + + List<Pair<Double,Double>> refCenters = Lists.newArrayList(); + refCenters.add(new Pair<Double,Double>(1.5,1.5)); + refCenters.add(new Pair<Double,Double>(4.333333333333334,4.333333333333334)); + Pair<Double,Double> c = new Pair<Double,Double>(clusterWritable.getValue() .getCenter().get(0), + clusterWritable.getValue().getCenter().get(1)); + assertTrue("center "+c+" not found", findAndRemove(c, refCenters, EPSILON)); + assertTrue("more to come", reader.next(key, clusterWritable)); + assertEquals("2nd key", "C-1", key.toString()); + c = new Pair<Double,Double>(clusterWritable.getValue().getCenter().get(0), + clusterWritable.getValue().getCenter().get(1)); + assertTrue("center " + c + " not found", findAndRemove(c, refCenters, EPSILON)); + assertFalse("more to come", reader.next(key, clusterWritable)); + } finally { + Closeables.close(reader, true); + } + } + + static boolean findAndRemove(Pair<Double, Double> target, Collection<Pair<Double, Double>> list, double epsilon) { + for (Pair<Double,Double> curr : list) { + if ( (Math.abs(target.getFirst() - curr.getFirst()) < epsilon) + && (Math.abs(target.getSecond() - curr.getSecond()) < epsilon) ) { + list.remove(curr); + return true; + } + } + return false; + } + + /** + * Story: User can produce final canopy centers using a Hadoop map/reduce job + * and a EuclideanDistanceMeasure. + */ + @Test + public void testCanopyGenEuclideanMR() throws Exception { + List<VectorWritable> points = getPointsWritable(); + Configuration config = getConfiguration(); + ClusteringTestUtils.writePointsToFile(points, + getTestTempFilePath("testdata/file1"), fs, config); + ClusteringTestUtils.writePointsToFile(points, + getTestTempFilePath("testdata/file2"), fs, config); + // now run the Canopy Driver + Path output = getTestTempDirPath("output"); + CanopyDriver.run(config, getTestTempDirPath("testdata"), output, + euclideanDistanceMeasure, 3.1, 2.1, false, 0.0, false); + + // verify output from sequence file + Path path = new Path(output, "clusters-0-final/part-r-00000"); + FileSystem fs = FileSystem.get(path.toUri(), config); + SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, config); + try { + Writable key = new Text(); + ClusterWritable clusterWritable = new ClusterWritable(); + assertTrue("more to come", reader.next(key, clusterWritable)); + assertEquals("1st key", "C-0", key.toString()); + + List<Pair<Double,Double>> refCenters = Lists.newArrayList(); + refCenters.add(new Pair<Double,Double>(1.8,1.8)); + refCenters.add(new Pair<Double,Double>(4.433333333333334, 4.433333333333334)); + Pair<Double,Double> c = new Pair<Double,Double>(clusterWritable.getValue().getCenter().get(0), + clusterWritable.getValue().getCenter().get(1)); + assertTrue("center "+c+" not found", findAndRemove(c, refCenters, EPSILON)); + assertTrue("more to come", reader.next(key, clusterWritable)); + assertEquals("2nd key", "C-1", key.toString()); + c = new Pair<Double,Double>(clusterWritable.getValue().getCenter().get(0), + clusterWritable.getValue().getCenter().get(1)); + assertTrue("center "+c+" not found", findAndRemove(c, refCenters, EPSILON)); + assertFalse("more to come", reader.next(key, clusterWritable)); + } finally { + Closeables.close(reader, true); + } + } + + /** Story: User can cluster points using sequential execution */ + @Test + public void testClusteringManhattanSeq() throws Exception { + List<VectorWritable> points = getPointsWritable(); + Configuration config = getConfiguration(); + ClusteringTestUtils.writePointsToFile(points, + getTestTempFilePath("testdata/file1"), fs, config); + // now run the Canopy Driver in sequential mode + Path output = getTestTempDirPath("output"); + CanopyDriver.run(config, getTestTempDirPath("testdata"), output, + manhattanDistanceMeasure, 3.1, 2.1, true, 0.0, true); + + // verify output from sequence file + Path path = new Path(output, "clusters-0-final/part-r-00000"); + int ix = 0; + for (ClusterWritable clusterWritable : new SequenceFileValueIterable<ClusterWritable>(path, true, + config)) { + assertEquals("Center [" + ix + ']', manhattanCentroids.get(ix), clusterWritable.getValue() + .getCenter()); + ix++; + } + + path = new Path(output, "clusteredPoints/part-m-0"); + long count = HadoopUtil.countRecords(path, config); + assertEquals("number of points", points.size(), count); + } + + /** Story: User can cluster points using sequential execution */ + @Test + public void testClusteringEuclideanSeq() throws Exception { + List<VectorWritable> points = getPointsWritable(); + Configuration config = getConfiguration(); + ClusteringTestUtils.writePointsToFile(points, + getTestTempFilePath("testdata/file1"), fs, config); + // now run the Canopy Driver in sequential mode + Path output = getTestTempDirPath("output"); + String[] args = { optKey(DefaultOptionCreator.INPUT_OPTION), + getTestTempDirPath("testdata").toString(), + optKey(DefaultOptionCreator.OUTPUT_OPTION), output.toString(), + optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION), + EuclideanDistanceMeasure.class.getName(), + optKey(DefaultOptionCreator.T1_OPTION), "3.1", + optKey(DefaultOptionCreator.T2_OPTION), "2.1", + optKey(DefaultOptionCreator.CLUSTERING_OPTION), + optKey(DefaultOptionCreator.OVERWRITE_OPTION), + optKey(DefaultOptionCreator.METHOD_OPTION), + DefaultOptionCreator.SEQUENTIAL_METHOD }; + ToolRunner.run(config, new CanopyDriver(), args); + + // verify output from sequence file + Path path = new Path(output, "clusters-0-final/part-r-00000"); + + int ix = 0; + for (ClusterWritable clusterWritable : new SequenceFileValueIterable<ClusterWritable>(path, true, + config)) { + assertEquals("Center [" + ix + ']', euclideanCentroids.get(ix), clusterWritable.getValue() + .getCenter()); + ix++; + } + + path = new Path(output, "clusteredPoints/part-m-0"); + long count = HadoopUtil.countRecords(path, config); + assertEquals("number of points", points.size(), count); + } + + /** Story: User can remove outliers while clustering points using sequential execution */ + @Test + public void testClusteringEuclideanWithOutlierRemovalSeq() throws Exception { + List<VectorWritable> points = getPointsWritable(); + Configuration config = getConfiguration(); + ClusteringTestUtils.writePointsToFile(points, + getTestTempFilePath("testdata/file1"), fs, config); + // now run the Canopy Driver in sequential mode + Path output = getTestTempDirPath("output"); + String[] args = { optKey(DefaultOptionCreator.INPUT_OPTION), + getTestTempDirPath("testdata").toString(), + optKey(DefaultOptionCreator.OUTPUT_OPTION), output.toString(), + optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION), + EuclideanDistanceMeasure.class.getName(), + optKey(DefaultOptionCreator.T1_OPTION), "3.1", + optKey(DefaultOptionCreator.T2_OPTION), "2.1", + optKey(DefaultOptionCreator.OUTLIER_THRESHOLD), "0.5", + optKey(DefaultOptionCreator.CLUSTERING_OPTION), + optKey(DefaultOptionCreator.OVERWRITE_OPTION), + optKey(DefaultOptionCreator.METHOD_OPTION), + DefaultOptionCreator.SEQUENTIAL_METHOD }; + ToolRunner.run(config, new CanopyDriver(), args); + + // verify output from sequence file + Path path = new Path(output, "clusters-0-final/part-r-00000"); + + int ix = 0; + for (ClusterWritable clusterWritable : new SequenceFileValueIterable<ClusterWritable>(path, true, + config)) { + assertEquals("Center [" + ix + ']', euclideanCentroids.get(ix), clusterWritable.getValue() + .getCenter()); + ix++; + } + + path = new Path(output, "clusteredPoints/part-m-0"); + long count = HadoopUtil.countRecords(path, config); + int expectedPointsHavingPDFGreaterThanThreshold = 6; + assertEquals("number of points", expectedPointsHavingPDFGreaterThanThreshold, count); + } + + + /** + * Story: User can produce final point clustering using a Hadoop map/reduce + * job and a ManhattanDistanceMeasure. + */ + @Test + public void testClusteringManhattanMR() throws Exception { + List<VectorWritable> points = getPointsWritable(); + Configuration conf = getConfiguration(); + ClusteringTestUtils.writePointsToFile(points, true, + getTestTempFilePath("testdata/file1"), fs, conf); + ClusteringTestUtils.writePointsToFile(points, true, + getTestTempFilePath("testdata/file2"), fs, conf); + // now run the Job + Path output = getTestTempDirPath("output"); + CanopyDriver.run(conf, getTestTempDirPath("testdata"), output, + manhattanDistanceMeasure, 3.1, 2.1, true, 0.0, false); + Path path = new Path(output, "clusteredPoints/part-m-00000"); + long count = HadoopUtil.countRecords(path, conf); + assertEquals("number of points", points.size(), count); + } + + /** + * Story: User can produce final point clustering using a Hadoop map/reduce + * job and a EuclideanDistanceMeasure. + */ + @Test + public void testClusteringEuclideanMR() throws Exception { + List<VectorWritable> points = getPointsWritable(); + Configuration conf = getConfiguration(); + ClusteringTestUtils.writePointsToFile(points, true, + getTestTempFilePath("testdata/file1"), fs, conf); + ClusteringTestUtils.writePointsToFile(points, true, + getTestTempFilePath("testdata/file2"), fs, conf); + // now run the Job using the run() command. Others can use runJob(). + Path output = getTestTempDirPath("output"); + String[] args = { optKey(DefaultOptionCreator.INPUT_OPTION), + getTestTempDirPath("testdata").toString(), + optKey(DefaultOptionCreator.OUTPUT_OPTION), output.toString(), + optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION), + EuclideanDistanceMeasure.class.getName(), + optKey(DefaultOptionCreator.T1_OPTION), "3.1", + optKey(DefaultOptionCreator.T2_OPTION), "2.1", + optKey(DefaultOptionCreator.CLUSTERING_OPTION), + optKey(DefaultOptionCreator.OVERWRITE_OPTION) }; + ToolRunner.run(getConfiguration(), new CanopyDriver(), args); + Path path = new Path(output, "clusteredPoints/part-m-00000"); + long count = HadoopUtil.countRecords(path, conf); + assertEquals("number of points", points.size(), count); + } + + /** + * Story: User can produce final point clustering using a Hadoop map/reduce + * job and a EuclideanDistanceMeasure and outlier removal threshold. + */ + @Test + public void testClusteringEuclideanWithOutlierRemovalMR() throws Exception { + List<VectorWritable> points = getPointsWritable(); + Configuration conf = getConfiguration(); + ClusteringTestUtils.writePointsToFile(points, true, + getTestTempFilePath("testdata/file1"), fs, conf); + ClusteringTestUtils.writePointsToFile(points, true, + getTestTempFilePath("testdata/file2"), fs, conf); + // now run the Job using the run() command. Others can use runJob(). + Path output = getTestTempDirPath("output"); + String[] args = { optKey(DefaultOptionCreator.INPUT_OPTION), + getTestTempDirPath("testdata").toString(), + optKey(DefaultOptionCreator.OUTPUT_OPTION), output.toString(), + optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION), + EuclideanDistanceMeasure.class.getName(), + optKey(DefaultOptionCreator.T1_OPTION), "3.1", + optKey(DefaultOptionCreator.T2_OPTION), "2.1", + optKey(DefaultOptionCreator.OUTLIER_THRESHOLD), "0.7", + optKey(DefaultOptionCreator.CLUSTERING_OPTION), + optKey(DefaultOptionCreator.OVERWRITE_OPTION) }; + ToolRunner.run(getConfiguration(), new CanopyDriver(), args); + Path path = new Path(output, "clusteredPoints/part-m-00000"); + long count = HadoopUtil.countRecords(path, conf); + int expectedPointsAfterOutlierRemoval = 8; + assertEquals("number of points", expectedPointsAfterOutlierRemoval, count); + } + + + /** + * Story: User can set T3 and T4 values to be used by the reducer for its T1 + * and T2 thresholds + */ + @Test + public void testCanopyReducerT3T4Configuration() throws Exception { + CanopyReducer reducer = new CanopyReducer(); + Configuration conf = getConfiguration(); + conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY, + "org.apache.mahout.common.distance.ManhattanDistanceMeasure"); + conf.set(CanopyConfigKeys.T1_KEY, String.valueOf(3.1)); + conf.set(CanopyConfigKeys.T2_KEY, String.valueOf(2.1)); + conf.set(CanopyConfigKeys.T3_KEY, String.valueOf(1.1)); + conf.set(CanopyConfigKeys.T4_KEY, String.valueOf(0.1)); + conf.set(CanopyConfigKeys.CF_KEY, "0"); + DummyRecordWriter<Text, ClusterWritable> writer = new DummyRecordWriter<Text, ClusterWritable>(); + Reducer<Text, VectorWritable, Text, ClusterWritable>.Context context = DummyRecordWriter + .build(reducer, conf, writer, Text.class, VectorWritable.class); + reducer.setup(context); + assertEquals(1.1, reducer.getCanopyClusterer().getT1(), EPSILON); + assertEquals(0.1, reducer.getCanopyClusterer().getT2(), EPSILON); + } + + /** + * Story: User can specify a clustering limit that prevents output of small + * clusters + */ + @Test + public void testCanopyMapperClusterFilter() throws Exception { + CanopyMapper mapper = new CanopyMapper(); + Configuration conf = getConfiguration(); + conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY, manhattanDistanceMeasure + .getClass().getName()); + conf.set(CanopyConfigKeys.T1_KEY, String.valueOf(3.1)); + conf.set(CanopyConfigKeys.T2_KEY, String.valueOf(2.1)); + conf.set(CanopyConfigKeys.CF_KEY, "3"); + DummyRecordWriter<Text, VectorWritable> writer = new DummyRecordWriter<Text, VectorWritable>(); + Mapper<WritableComparable<?>, VectorWritable, Text, VectorWritable>.Context context = DummyRecordWriter + .build(mapper, conf, writer); + mapper.setup(context); + + List<VectorWritable> points = getPointsWritable(); + // map the data + for (VectorWritable point : points) { + mapper.map(new Text(), point, context); + } + mapper.cleanup(context); + assertEquals("Number of map results", 1, writer.getData().size()); + // now verify the output + List<VectorWritable> data = writer.getValue(new Text("centroid")); + assertEquals("Number of centroids", 2, data.size()); + } + + /** + * Story: User can specify a cluster filter that limits the minimum size of + * canopies produced by the reducer + */ + @Test + public void testCanopyReducerClusterFilter() throws Exception { + CanopyReducer reducer = new CanopyReducer(); + Configuration conf = getConfiguration(); + conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY, + "org.apache.mahout.common.distance.ManhattanDistanceMeasure"); + conf.set(CanopyConfigKeys.T1_KEY, String.valueOf(3.1)); + conf.set(CanopyConfigKeys.T2_KEY, String.valueOf(2.1)); + conf.set(CanopyConfigKeys.CF_KEY, "3"); + DummyRecordWriter<Text, ClusterWritable> writer = new DummyRecordWriter<Text, ClusterWritable>(); + Reducer<Text, VectorWritable, Text, ClusterWritable>.Context context = DummyRecordWriter + .build(reducer, conf, writer, Text.class, VectorWritable.class); + reducer.setup(context); + + List<VectorWritable> points = getPointsWritable(); + reducer.reduce(new Text("centroid"), points, context); + Set<Text> keys = writer.getKeys(); + assertEquals("Number of centroids", 2, keys.size()); + } +}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/test/java/org/apache/mahout/clustering/classify/ClusterClassificationDriverTest.java ---------------------------------------------------------------------- diff --git a/mr/src/test/java/org/apache/mahout/clustering/classify/ClusterClassificationDriverTest.java b/mr/src/test/java/org/apache/mahout/clustering/classify/ClusterClassificationDriverTest.java new file mode 100644 index 0000000..cbf0e55 --- /dev/null +++ b/mr/src/test/java/org/apache/mahout/clustering/classify/ClusterClassificationDriverTest.java @@ -0,0 +1,255 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.classify; + +import java.io.IOException; +import java.util.List; +import java.util.Set; + +import org.apache.commons.lang3.ArrayUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; +import org.apache.mahout.clustering.ClusteringTestUtils; +import org.apache.mahout.clustering.canopy.CanopyDriver; +import org.apache.mahout.clustering.iterator.CanopyClusteringPolicy; +import org.apache.mahout.common.HadoopUtil; +import org.apache.mahout.common.MahoutTestCase; +import org.apache.mahout.common.distance.ManhattanDistanceMeasure; +import org.apache.mahout.common.iterator.sequencefile.PathFilters; +import org.apache.mahout.math.NamedVector; +import org.apache.mahout.math.RandomAccessSparseVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +public class ClusterClassificationDriverTest extends MahoutTestCase { + + private static final double[][] REFERENCE = { {1, 1}, {2, 1}, {1, 2}, {4, 4}, + {5, 4}, {4, 5}, {5, 5}, {9, 9}, {8, 8}}; + + private FileSystem fs; + private Path clusteringOutputPath; + private Configuration conf; + private Path pointsPath; + private Path classifiedOutputPath; + private List<Vector> firstCluster; + private List<Vector> secondCluster; + private List<Vector> thirdCluster; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + Configuration conf = getConfiguration(); + fs = FileSystem.get(conf); + firstCluster = Lists.newArrayList(); + secondCluster = Lists.newArrayList(); + thirdCluster = Lists.newArrayList(); + + } + + private static List<VectorWritable> getPointsWritable(double[][] raw) { + List<VectorWritable> points = Lists.newArrayList(); + for (double[] fr : raw) { + Vector vec = new RandomAccessSparseVector(fr.length); + vec.assign(fr); + points.add(new VectorWritable(vec)); + } + return points; + } + + @Test + public void testVectorClassificationWithOutlierRemovalMR() throws Exception { + List<VectorWritable> points = getPointsWritable(REFERENCE); + + pointsPath = getTestTempDirPath("points"); + clusteringOutputPath = getTestTempDirPath("output"); + classifiedOutputPath = getTestTempDirPath("classifiedClusters"); + HadoopUtil.delete(conf, classifiedOutputPath); + + conf = getConfiguration(); + + ClusteringTestUtils.writePointsToFile(points, true, + new Path(pointsPath, "file1"), fs, conf); + runClustering(pointsPath, conf, false); + runClassificationWithOutlierRemoval(false); + collectVectorsForAssertion(); + assertVectorsWithOutlierRemoval(); + } + + @Test + public void testVectorClassificationWithoutOutlierRemoval() throws Exception { + List<VectorWritable> points = getPointsWritable(REFERENCE); + + pointsPath = getTestTempDirPath("points"); + clusteringOutputPath = getTestTempDirPath("output"); + classifiedOutputPath = getTestTempDirPath("classify"); + + conf = getConfiguration(); + + ClusteringTestUtils.writePointsToFile(points, + new Path(pointsPath, "file1"), fs, conf); + runClustering(pointsPath, conf, true); + runClassificationWithoutOutlierRemoval(); + collectVectorsForAssertion(); + assertVectorsWithoutOutlierRemoval(); + } + + @Test + public void testVectorClassificationWithOutlierRemoval() throws Exception { + List<VectorWritable> points = getPointsWritable(REFERENCE); + + pointsPath = getTestTempDirPath("points"); + clusteringOutputPath = getTestTempDirPath("output"); + classifiedOutputPath = getTestTempDirPath("classify"); + + conf = getConfiguration(); + + ClusteringTestUtils.writePointsToFile(points, + new Path(pointsPath, "file1"), fs, conf); + runClustering(pointsPath, conf, true); + runClassificationWithOutlierRemoval(true); + collectVectorsForAssertion(); + assertVectorsWithOutlierRemoval(); + } + + private void runClustering(Path pointsPath, Configuration conf, + Boolean runSequential) throws IOException, InterruptedException, + ClassNotFoundException { + CanopyDriver.run(conf, pointsPath, clusteringOutputPath, + new ManhattanDistanceMeasure(), 3.1, 2.1, false, 0.0, runSequential); + Path finalClustersPath = new Path(clusteringOutputPath, "clusters-0-final"); + ClusterClassifier.writePolicy(new CanopyClusteringPolicy(), + finalClustersPath); + } + + private void runClassificationWithoutOutlierRemoval() + throws IOException, InterruptedException, ClassNotFoundException { + ClusterClassificationDriver.run(getConfiguration(), pointsPath, clusteringOutputPath, classifiedOutputPath, 0.0, true, true); + } + + private void runClassificationWithOutlierRemoval(boolean runSequential) + throws IOException, InterruptedException, ClassNotFoundException { + ClusterClassificationDriver.run(getConfiguration(), pointsPath, clusteringOutputPath, classifiedOutputPath, 0.73, true, runSequential); + } + + private void collectVectorsForAssertion() throws IOException { + Path[] partFilePaths = FileUtil.stat2Paths(fs + .globStatus(classifiedOutputPath)); + FileStatus[] listStatus = fs.listStatus(partFilePaths, + PathFilters.partFilter()); + for (FileStatus partFile : listStatus) { + SequenceFile.Reader classifiedVectors = new SequenceFile.Reader(fs, + partFile.getPath(), conf); + Writable clusterIdAsKey = new IntWritable(); + WeightedPropertyVectorWritable point = new WeightedPropertyVectorWritable(); + while (classifiedVectors.next(clusterIdAsKey, point)) { + collectVector(clusterIdAsKey.toString(), point.getVector()); + } + } + } + + private void collectVector(String clusterId, Vector vector) { + if ("0".equals(clusterId)) { + firstCluster.add(vector); + } else if ("1".equals(clusterId)) { + secondCluster.add(vector); + } else if ("2".equals(clusterId)) { + thirdCluster.add(vector); + } + } + + private void assertVectorsWithOutlierRemoval() { + checkClustersWithOutlierRemoval(); + } + + private void assertVectorsWithoutOutlierRemoval() { + assertFirstClusterWithoutOutlierRemoval(); + assertSecondClusterWithoutOutlierRemoval(); + assertThirdClusterWithoutOutlierRemoval(); + } + + private void assertThirdClusterWithoutOutlierRemoval() { + Assert.assertEquals(2, thirdCluster.size()); + for (Vector vector : thirdCluster) { + Assert.assertTrue(ArrayUtils.contains(new String[] {"{0:9.0,1:9.0}", + "{0:8.0,1:8.0}"}, vector.asFormatString())); + } + } + + private void assertSecondClusterWithoutOutlierRemoval() { + Assert.assertEquals(4, secondCluster.size()); + for (Vector vector : secondCluster) { + Assert.assertTrue(ArrayUtils.contains(new String[] {"{0:4.0,1:4.0}", + "{0:5.0,1:4.0}", "{0:4.0,1:5.0}", "{0:5.0,1:5.0}"}, + vector.asFormatString())); + } + } + + private void assertFirstClusterWithoutOutlierRemoval() { + Assert.assertEquals(3, firstCluster.size()); + for (Vector vector : firstCluster) { + Assert.assertTrue(ArrayUtils.contains(new String[] {"{0:1.0,1:1.0}", + "{0:2.0,1:1.0}", "{0:1.0,1:2.0}"}, vector.asFormatString())); + } + } + + private void checkClustersWithOutlierRemoval() { + Set<String> reference = Sets.newHashSet("{0:9.0,1:9.0}", "{0:1.0,1:1.0}"); + + List<List<Vector>> clusters = Lists.newArrayList(); + clusters.add(firstCluster); + clusters.add(secondCluster); + clusters.add(thirdCluster); + + int singletonCnt = 0; + int emptyCnt = 0; + for (List<Vector> vList : clusters) { + if (vList.isEmpty()) { + emptyCnt++; + } else { + singletonCnt++; + assertEquals("expecting only singleton clusters; got size=" + vList.size(), 1, vList.size()); + if (vList.get(0).getClass().equals(NamedVector.class)) { + Assert.assertTrue("not expecting cluster:" + ((NamedVector) vList.get(0)).getDelegate().asFormatString(), + reference.contains(((NamedVector) vList.get(0)).getDelegate().asFormatString())); + reference.remove(((NamedVector)vList.get(0)).getDelegate().asFormatString()); + } else if (vList.get(0).getClass().equals(RandomAccessSparseVector.class)) { + Assert.assertTrue("not expecting cluster:" + vList.get(0).asFormatString(), + reference.contains(vList.get(0).asFormatString())); + reference.remove(vList.get(0).asFormatString()); + } + } + } + Assert.assertEquals("Different number of empty clusters than expected!", 1, emptyCnt); + Assert.assertEquals("Different number of singletons than expected!", 2, singletonCnt); + Assert.assertEquals("Didn't match all reference clusters!", 0, reference.size()); + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java ---------------------------------------------------------------------- diff --git a/mr/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java b/mr/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java new file mode 100644 index 0000000..fc71ecf --- /dev/null +++ b/mr/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java @@ -0,0 +1,202 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.fuzzykmeans; + +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.util.ToolRunner; +import org.apache.mahout.clustering.ClusteringTestUtils; +import org.apache.mahout.clustering.kmeans.TestKmeansClustering; +import org.apache.mahout.common.HadoopUtil; +import org.apache.mahout.common.MahoutTestCase; +import org.apache.mahout.common.commandline.DefaultOptionCreator; +import org.apache.mahout.common.distance.DistanceMeasure; +import org.apache.mahout.common.distance.EuclideanDistanceMeasure; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.io.Closeables; + +public final class TestFuzzyKmeansClustering extends MahoutTestCase { + + private FileSystem fs; + private final DistanceMeasure measure = new EuclideanDistanceMeasure(); + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + Configuration conf = getConfiguration(); + fs = FileSystem.get(conf); + } + + private static Vector tweakValue(Vector point) { + return point.plus(0.1); + } + + @Test + public void testFuzzyKMeansSeqJob() throws Exception { + List<VectorWritable> points = TestKmeansClustering.getPointsWritable(TestKmeansClustering.REFERENCE); + + Path pointsPath = getTestTempDirPath("points"); + Path clustersPath = getTestTempDirPath("clusters"); + Configuration conf = getConfiguration(); + ClusteringTestUtils.writePointsToFile(points, new Path(pointsPath, "file1"), fs, conf); + + for (int k = 0; k < points.size(); k++) { + System.out.println("testKFuzzyKMeansMRJob k= " + k); + // pick k initial cluster centers at random + SequenceFile.Writer writer = new SequenceFile.Writer(fs, + conf, + new Path(clustersPath, "part-00000"), + Text.class, + SoftCluster.class); + try { + for (int i = 0; i < k + 1; i++) { + Vector vec = tweakValue(points.get(i).get()); + SoftCluster cluster = new SoftCluster(vec, i, measure); + /* add the center so the centroid will be correct upon output */ + cluster.observe(cluster.getCenter(), 1); + // writer.write(cluster.getIdentifier() + '\t' + SoftCluster.formatCluster(cluster) + '\n'); + writer.append(new Text(cluster.getIdentifier()), cluster); + } + } finally { + Closeables.close(writer, false); + } + + // now run the Job using the run() command line options. + Path output = getTestTempDirPath("output" + k); + /* FuzzyKMeansDriver.runJob(pointsPath, + clustersPath, + output, + EuclideanDistanceMeasure.class.getName(), + 0.001, + 2, + k + 1, + 2, + false, + true, + 0); + */ + String[] args = { + optKey(DefaultOptionCreator.INPUT_OPTION), pointsPath.toString(), + optKey(DefaultOptionCreator.CLUSTERS_IN_OPTION), + clustersPath.toString(), + optKey(DefaultOptionCreator.OUTPUT_OPTION), + output.toString(), + optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION), + EuclideanDistanceMeasure.class.getName(), + optKey(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION), + "0.001", + optKey(DefaultOptionCreator.MAX_ITERATIONS_OPTION), + "2", + optKey(FuzzyKMeansDriver.M_OPTION), + "2.0", + optKey(DefaultOptionCreator.CLUSTERING_OPTION), + optKey(DefaultOptionCreator.EMIT_MOST_LIKELY_OPTION), + optKey(DefaultOptionCreator.OVERWRITE_OPTION), + optKey(DefaultOptionCreator.METHOD_OPTION), + DefaultOptionCreator.SEQUENTIAL_METHOD + }; + FuzzyKMeansDriver.main(args); + long count = HadoopUtil.countRecords(new Path(output, "clusteredPoints/part-m-0"), conf); + assertTrue(count > 0); + } + + } + + @Test + public void testFuzzyKMeansMRJob() throws Exception { + List<VectorWritable> points = TestKmeansClustering.getPointsWritable(TestKmeansClustering.REFERENCE); + + Path pointsPath = getTestTempDirPath("points"); + Path clustersPath = getTestTempDirPath("clusters"); + Configuration conf = getConfiguration(); + ClusteringTestUtils.writePointsToFile(points, new Path(pointsPath, "file1"), fs, conf); + + for (int k = 0; k < points.size(); k++) { + System.out.println("testKFuzzyKMeansMRJob k= " + k); + // pick k initial cluster centers at random + SequenceFile.Writer writer = new SequenceFile.Writer(fs, + conf, + new Path(clustersPath, "part-00000"), + Text.class, + SoftCluster.class); + try { + for (int i = 0; i < k + 1; i++) { + Vector vec = tweakValue(points.get(i).get()); + + SoftCluster cluster = new SoftCluster(vec, i, measure); + /* add the center so the centroid will be correct upon output */ + cluster.observe(cluster.getCenter(), 1); + // writer.write(cluster.getIdentifier() + '\t' + SoftCluster.formatCluster(cluster) + '\n'); + writer.append(new Text(cluster.getIdentifier()), cluster); + + } + } finally { + Closeables.close(writer, false); + } + + // now run the Job using the run() command line options. + Path output = getTestTempDirPath("output" + k); + /* FuzzyKMeansDriver.runJob(pointsPath, + clustersPath, + output, + EuclideanDistanceMeasure.class.getName(), + 0.001, + 2, + k + 1, + 2, + false, + true, + 0); + */ + String[] args = { + optKey(DefaultOptionCreator.INPUT_OPTION), + pointsPath.toString(), + optKey(DefaultOptionCreator.CLUSTERS_IN_OPTION), + clustersPath.toString(), + optKey(DefaultOptionCreator.OUTPUT_OPTION), + output.toString(), + optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION), + EuclideanDistanceMeasure.class.getName(), + optKey(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION), + "0.001", + optKey(DefaultOptionCreator.MAX_ITERATIONS_OPTION), + "2", + optKey(FuzzyKMeansDriver.M_OPTION), + "2.0", + optKey(DefaultOptionCreator.CLUSTERING_OPTION), + optKey(DefaultOptionCreator.EMIT_MOST_LIKELY_OPTION), + optKey(DefaultOptionCreator.OVERWRITE_OPTION) + }; + ToolRunner.run(getConfiguration(), new FuzzyKMeansDriver(), args); + long count = HadoopUtil.countRecords(new Path(output, "clusteredPoints/part-m-00000"), conf); + assertTrue(count > 0); + } + + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/test/java/org/apache/mahout/clustering/iterator/TestClusterClassifier.java ---------------------------------------------------------------------- diff --git a/mr/src/test/java/org/apache/mahout/clustering/iterator/TestClusterClassifier.java b/mr/src/test/java/org/apache/mahout/clustering/iterator/TestClusterClassifier.java new file mode 100644 index 0000000..fdcfd64 --- /dev/null +++ b/mr/src/test/java/org/apache/mahout/clustering/iterator/TestClusterClassifier.java @@ -0,0 +1,238 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.iterator; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.mahout.clustering.AbstractCluster; +import org.apache.mahout.clustering.Cluster; +import org.apache.mahout.clustering.ClusteringTestUtils; +import org.apache.mahout.clustering.classify.ClusterClassifier; +import org.apache.mahout.clustering.fuzzykmeans.SoftCluster; +import org.apache.mahout.clustering.kmeans.TestKmeansClustering; +import org.apache.mahout.common.MahoutTestCase; +import org.apache.mahout.common.distance.CosineDistanceMeasure; +import org.apache.mahout.common.distance.DistanceMeasure; +import org.apache.mahout.common.distance.ManhattanDistanceMeasure; +import org.apache.mahout.math.DenseVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; +import org.junit.Test; + +import com.google.common.collect.Lists; + +public final class TestClusterClassifier extends MahoutTestCase { + + private static ClusterClassifier newDMClassifier() { + List<Cluster> models = Lists.newArrayList(); + DistanceMeasure measure = new ManhattanDistanceMeasure(); + models.add(new DistanceMeasureCluster(new DenseVector(2).assign(1), 0, measure)); + models.add(new DistanceMeasureCluster(new DenseVector(2), 1, measure)); + models.add(new DistanceMeasureCluster(new DenseVector(2).assign(-1), 2, measure)); + return new ClusterClassifier(models, new KMeansClusteringPolicy()); + } + + private static ClusterClassifier newKlusterClassifier() { + List<Cluster> models = Lists.newArrayList(); + DistanceMeasure measure = new ManhattanDistanceMeasure(); + models.add(new org.apache.mahout.clustering.kmeans.Kluster(new DenseVector(2).assign(1), 0, measure)); + models.add(new org.apache.mahout.clustering.kmeans.Kluster(new DenseVector(2), 1, measure)); + models.add(new org.apache.mahout.clustering.kmeans.Kluster(new DenseVector(2).assign(-1), 2, measure)); + return new ClusterClassifier(models, new KMeansClusteringPolicy()); + } + + private static ClusterClassifier newCosineKlusterClassifier() { + List<Cluster> models = Lists.newArrayList(); + DistanceMeasure measure = new CosineDistanceMeasure(); + models.add(new org.apache.mahout.clustering.kmeans.Kluster(new DenseVector(2).assign(1), 0, measure)); + models.add(new org.apache.mahout.clustering.kmeans.Kluster(new DenseVector(2), 1, measure)); + models.add(new org.apache.mahout.clustering.kmeans.Kluster(new DenseVector(2).assign(-1), 2, measure)); + return new ClusterClassifier(models, new KMeansClusteringPolicy()); + } + + private static ClusterClassifier newSoftClusterClassifier() { + List<Cluster> models = Lists.newArrayList(); + DistanceMeasure measure = new ManhattanDistanceMeasure(); + models.add(new SoftCluster(new DenseVector(2).assign(1), 0, measure)); + models.add(new SoftCluster(new DenseVector(2), 1, measure)); + models.add(new SoftCluster(new DenseVector(2).assign(-1), 2, measure)); + return new ClusterClassifier(models, new FuzzyKMeansClusteringPolicy()); + } + + private ClusterClassifier writeAndRead(ClusterClassifier classifier) throws IOException { + Path path = new Path(getTestTempDirPath(), "output"); + classifier.writeToSeqFiles(path); + ClusterClassifier newClassifier = new ClusterClassifier(); + newClassifier.readFromSeqFiles(getConfiguration(), path); + return newClassifier; + } + + @Test + public void testDMClusterClassification() { + ClusterClassifier classifier = newDMClassifier(); + Vector pdf = classifier.classify(new DenseVector(2)); + assertEquals("[0,0]", "[0.2,0.6,0.2]", AbstractCluster.formatVector(pdf, null)); + pdf = classifier.classify(new DenseVector(2).assign(2)); + assertEquals("[2,2]", "[0.493,0.296,0.211]", AbstractCluster.formatVector(pdf, null)); + } + + @Test + public void testClusterClassification() { + ClusterClassifier classifier = newKlusterClassifier(); + Vector pdf = classifier.classify(new DenseVector(2)); + assertEquals("[0,0]", "[0.2,0.6,0.2]", AbstractCluster.formatVector(pdf, null)); + pdf = classifier.classify(new DenseVector(2).assign(2)); + assertEquals("[2,2]", "[0.493,0.296,0.211]", AbstractCluster.formatVector(pdf, null)); + } + + @Test + public void testSoftClusterClassification() { + ClusterClassifier classifier = newSoftClusterClassifier(); + Vector pdf = classifier.classify(new DenseVector(2)); + assertEquals("[0,0]", "[0.0,1.0,0.0]", AbstractCluster.formatVector(pdf, null)); + pdf = classifier.classify(new DenseVector(2).assign(2)); + assertEquals("[2,2]", "[0.735,0.184,0.082]", AbstractCluster.formatVector(pdf, null)); + } + + @Test + public void testDMClassifierSerialization() throws Exception { + ClusterClassifier classifier = newDMClassifier(); + ClusterClassifier classifierOut = writeAndRead(classifier); + assertEquals(classifier.getModels().size(), classifierOut.getModels().size()); + assertEquals(classifier.getModels().get(0).getClass().getName(), classifierOut.getModels().get(0).getClass() + .getName()); + } + + @Test + public void testClusterClassifierSerialization() throws Exception { + ClusterClassifier classifier = newKlusterClassifier(); + ClusterClassifier classifierOut = writeAndRead(classifier); + assertEquals(classifier.getModels().size(), classifierOut.getModels().size()); + assertEquals(classifier.getModels().get(0).getClass().getName(), classifierOut.getModels().get(0).getClass() + .getName()); + } + + @Test + public void testSoftClusterClassifierSerialization() throws Exception { + ClusterClassifier classifier = newSoftClusterClassifier(); + ClusterClassifier classifierOut = writeAndRead(classifier); + assertEquals(classifier.getModels().size(), classifierOut.getModels().size()); + assertEquals(classifier.getModels().get(0).getClass().getName(), classifierOut.getModels().get(0).getClass() + .getName()); + } + + @Test + public void testClusterIteratorKMeans() { + List<Vector> data = TestKmeansClustering.getPoints(TestKmeansClustering.REFERENCE); + ClusterClassifier prior = newKlusterClassifier(); + ClusterClassifier posterior = ClusterIterator.iterate(data, prior, 5); + assertEquals(3, posterior.getModels().size()); + for (Cluster cluster : posterior.getModels()) { + System.out.println(cluster.asFormatString(null)); + } + } + + @Test + public void testClusterIteratorDirichlet() { + List<Vector> data = TestKmeansClustering.getPoints(TestKmeansClustering.REFERENCE); + ClusterClassifier prior = newKlusterClassifier(); + ClusterClassifier posterior = ClusterIterator.iterate(data, prior, 5); + assertEquals(3, posterior.getModels().size()); + for (Cluster cluster : posterior.getModels()) { + System.out.println(cluster.asFormatString(null)); + } + } + + @Test + public void testSeqFileClusterIteratorKMeans() throws IOException { + Path pointsPath = getTestTempDirPath("points"); + Path priorPath = getTestTempDirPath("prior"); + Path outPath = getTestTempDirPath("output"); + Configuration conf = getConfiguration(); + FileSystem fs = FileSystem.get(pointsPath.toUri(), conf); + List<VectorWritable> points = TestKmeansClustering.getPointsWritable(TestKmeansClustering.REFERENCE); + ClusteringTestUtils.writePointsToFile(points, new Path(pointsPath, "file1"), fs, conf); + Path path = new Path(priorPath, "priorClassifier"); + ClusterClassifier prior = newKlusterClassifier(); + prior.writeToSeqFiles(path); + assertEquals(3, prior.getModels().size()); + System.out.println("Prior"); + for (Cluster cluster : prior.getModels()) { + System.out.println(cluster.asFormatString(null)); + } + ClusterIterator.iterateSeq(conf, pointsPath, path, outPath, 5); + + for (int i = 1; i <= 4; i++) { + System.out.println("Classifier-" + i); + ClusterClassifier posterior = new ClusterClassifier(); + String name = i == 4 ? "clusters-4-final" : "clusters-" + i; + posterior.readFromSeqFiles(conf, new Path(outPath, name)); + assertEquals(3, posterior.getModels().size()); + for (Cluster cluster : posterior.getModels()) { + System.out.println(cluster.asFormatString(null)); + } + + } + } + + @Test + public void testMRFileClusterIteratorKMeans() throws Exception { + Path pointsPath = getTestTempDirPath("points"); + Path priorPath = getTestTempDirPath("prior"); + Path outPath = getTestTempDirPath("output"); + Configuration conf = getConfiguration(); + FileSystem fs = FileSystem.get(pointsPath.toUri(), conf); + List<VectorWritable> points = TestKmeansClustering.getPointsWritable(TestKmeansClustering.REFERENCE); + ClusteringTestUtils.writePointsToFile(points, new Path(pointsPath, "file1"), fs, conf); + Path path = new Path(priorPath, "priorClassifier"); + ClusterClassifier prior = newKlusterClassifier(); + prior.writeToSeqFiles(path); + ClusteringPolicy policy = new KMeansClusteringPolicy(); + ClusterClassifier.writePolicy(policy, path); + assertEquals(3, prior.getModels().size()); + System.out.println("Prior"); + for (Cluster cluster : prior.getModels()) { + System.out.println(cluster.asFormatString(null)); + } + ClusterIterator.iterateMR(conf, pointsPath, path, outPath, 5); + + for (int i = 1; i <= 4; i++) { + System.out.println("Classifier-" + i); + ClusterClassifier posterior = new ClusterClassifier(); + String name = i == 4 ? "clusters-4-final" : "clusters-" + i; + posterior.readFromSeqFiles(conf, new Path(outPath, name)); + assertEquals(3, posterior.getModels().size()); + for (Cluster cluster : posterior.getModels()) { + System.out.println(cluster.asFormatString(null)); + } + } + } + + @Test + public void testCosineKlusterClassification() { + ClusterClassifier classifier = newCosineKlusterClassifier(); + Vector pdf = classifier.classify(new DenseVector(2)); + assertEquals("[0,0]", "[0.333,0.333,0.333]", AbstractCluster.formatVector(pdf, null)); + pdf = classifier.classify(new DenseVector(2).assign(2)); + assertEquals("[2,2]", "[0.429,0.429,0.143]", AbstractCluster.formatVector(pdf, null)); + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java ---------------------------------------------------------------------- diff --git a/mr/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java b/mr/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java new file mode 100644 index 0000000..5666765 --- /dev/null +++ b/mr/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java @@ -0,0 +1,385 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.kmeans; + +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.util.ToolRunner; +import org.apache.mahout.clustering.ClusteringTestUtils; +import org.apache.mahout.clustering.canopy.CanopyDriver; +import org.apache.mahout.clustering.classify.WeightedPropertyVectorWritable; +import org.apache.mahout.clustering.iterator.ClusterWritable; +import org.apache.mahout.common.DummyOutputCollector; +import org.apache.mahout.common.MahoutTestCase; +import org.apache.mahout.common.Pair; +import org.apache.mahout.common.commandline.DefaultOptionCreator; +import org.apache.mahout.common.distance.DistanceMeasure; +import org.apache.mahout.common.distance.EuclideanDistanceMeasure; +import org.apache.mahout.common.distance.ManhattanDistanceMeasure; +import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable; +import org.apache.mahout.math.DenseVector; +import org.apache.mahout.math.RandomAccessSparseVector; +import org.apache.mahout.math.SequentialAccessSparseVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.Lists; +import com.google.common.io.Closeables; + +public final class TestKmeansClustering extends MahoutTestCase { + + public static final double[][] REFERENCE = { {1, 1}, {2, 1}, {1, 2}, {2, 2}, {3, 3}, {4, 4}, {5, 4}, {4, 5}, {5, 5}}; + + private static final int[][] EXPECTED_NUM_POINTS = { {9}, {4, 5}, {4, 4, 1}, {1, 2, 1, 5}, {1, 1, 1, 2, 4}, + {1, 1, 1, 1, 1, 4}, {1, 1, 1, 1, 1, 2, 2}, {1, 1, 1, 1, 1, 1, 2, 1}, {1, 1, 1, 1, 1, 1, 1, 1, 1}}; + + private FileSystem fs; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + Configuration conf = getConfiguration(); + fs = FileSystem.get(conf); + } + + public static List<VectorWritable> getPointsWritable(double[][] raw) { + List<VectorWritable> points = Lists.newArrayList(); + for (double[] fr : raw) { + Vector vec = new RandomAccessSparseVector(fr.length); + vec.assign(fr); + points.add(new VectorWritable(vec)); + } + return points; + } + + public static List<VectorWritable> getPointsWritableDenseVector(double[][] raw) { + List<VectorWritable> points = Lists.newArrayList(); + for (double[] fr : raw) { + Vector vec = new DenseVector(fr.length); + vec.assign(fr); + points.add(new VectorWritable(vec)); + } + return points; + } + + public static List<Vector> getPoints(double[][] raw) { + List<Vector> points = Lists.newArrayList(); + for (double[] fr : raw) { + Vector vec = new SequentialAccessSparseVector(fr.length); + vec.assign(fr); + points.add(vec); + } + return points; + } + + /** + * Tests + * {@link KMeansClusterer#runKMeansIteration(Iterable, Iterable, DistanceMeasure, double)} + * ) single run convergence with a given distance threshold. + */ + /*@Test + public void testRunKMeansIterationConvergesInOneRunWithGivenDistanceThreshold() { + double[][] rawPoints = { {0, 0}, {0, 0.25}, {0, 0.75}, {0, 1}}; + List<Vector> points = getPoints(rawPoints); + + ManhattanDistanceMeasure distanceMeasure = new ManhattanDistanceMeasure(); + List<Kluster> clusters = Arrays.asList(new Kluster(points.get(0), 0, distanceMeasure), new Kluster(points.get(3), + 3, distanceMeasure)); + + // To converge in a single run, the given distance threshold should be + // greater than or equal to 0.125, + // since 0.125 will be the distance between center and centroid for the + // initial two clusters after one run. + double distanceThreshold = 0.25; + + boolean converged = KMeansClusterer.runKMeansIteration(points, clusters, distanceMeasure, distanceThreshold); + + Vector cluster1Center = clusters.get(0).getCenter(); + assertEquals(0, cluster1Center.get(0), EPSILON); + assertEquals(0.125, cluster1Center.get(1), EPSILON); + + Vector cluster2Center = clusters.get(1).getCenter(); + assertEquals(0, cluster2Center.get(0), EPSILON); + assertEquals(0.875, cluster2Center.get(1), EPSILON); + + assertTrue("KMeans iteration should be converged after a single run", converged); + }*/ + + /** Story: User wishes to run kmeans job on reference data */ + @Test + public void testKMeansSeqJob() throws Exception { + DistanceMeasure measure = new EuclideanDistanceMeasure(); + List<VectorWritable> points = getPointsWritable(REFERENCE); + + Path pointsPath = getTestTempDirPath("points"); + Path clustersPath = getTestTempDirPath("clusters"); + Configuration conf = getConfiguration(); + ClusteringTestUtils.writePointsToFile(points, true, new Path(pointsPath, "file1"), fs, conf); + ClusteringTestUtils.writePointsToFile(points, true, new Path(pointsPath, "file2"), fs, conf); + for (int k = 1; k < points.size(); k++) { + System.out.println("testKMeansMRJob k= " + k); + // pick k initial cluster centers at random + Path path = new Path(clustersPath, "part-00000"); + FileSystem fs = FileSystem.get(path.toUri(), conf); + SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path, Text.class, Kluster.class); + try { + for (int i = 0; i < k + 1; i++) { + Vector vec = points.get(i).get(); + + Kluster cluster = new Kluster(vec, i, measure); + // add the center so the centroid will be correct upon output + cluster.observe(cluster.getCenter(), 1); + writer.append(new Text(cluster.getIdentifier()), cluster); + } + } finally { + Closeables.close(writer, false); + } + // now run the Job + Path outputPath = getTestTempDirPath("output" + k); + String[] args = {optKey(DefaultOptionCreator.INPUT_OPTION), pointsPath.toString(), + optKey(DefaultOptionCreator.CLUSTERS_IN_OPTION), clustersPath.toString(), + optKey(DefaultOptionCreator.OUTPUT_OPTION), outputPath.toString(), + optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION), EuclideanDistanceMeasure.class.getName(), + optKey(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION), "0.001", + optKey(DefaultOptionCreator.MAX_ITERATIONS_OPTION), "2", optKey(DefaultOptionCreator.CLUSTERING_OPTION), + optKey(DefaultOptionCreator.OVERWRITE_OPTION), optKey(DefaultOptionCreator.METHOD_OPTION), + DefaultOptionCreator.SEQUENTIAL_METHOD}; + ToolRunner.run(conf, new KMeansDriver(), args); + + // now compare the expected clusters with actual + Path clusteredPointsPath = new Path(outputPath, "clusteredPoints"); + int[] expect = EXPECTED_NUM_POINTS[k]; + DummyOutputCollector<IntWritable,WeightedPropertyVectorWritable> collector = new DummyOutputCollector<IntWritable,WeightedPropertyVectorWritable>(); + // The key is the clusterId, the value is the weighted vector + for (Pair<IntWritable,WeightedPropertyVectorWritable> record : new SequenceFileIterable<IntWritable,WeightedPropertyVectorWritable>( + new Path(clusteredPointsPath, "part-m-0"), conf)) { + collector.collect(record.getFirst(), record.getSecond()); + } + assertEquals("clusters[" + k + ']', expect.length, collector.getKeys().size()); + } + } + + /** Story: User wishes to run kmeans job on reference data (DenseVector test) */ + @Test + public void testKMeansSeqJobDenseVector() throws Exception { + DistanceMeasure measure = new EuclideanDistanceMeasure(); + List<VectorWritable> points = getPointsWritableDenseVector(REFERENCE); + + Path pointsPath = getTestTempDirPath("points"); + Path clustersPath = getTestTempDirPath("clusters"); + Configuration conf = getConfiguration(); + ClusteringTestUtils.writePointsToFile(points, true, new Path(pointsPath, "file1"), fs, conf); + ClusteringTestUtils.writePointsToFile(points, true, new Path(pointsPath, "file2"), fs, conf); + for (int k = 1; k < points.size(); k++) { + System.out.println("testKMeansMRJob k= " + k); + // pick k initial cluster centers at random + Path path = new Path(clustersPath, "part-00000"); + FileSystem fs = FileSystem.get(path.toUri(), conf); + SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path, Text.class, Kluster.class); + try { + for (int i = 0; i < k + 1; i++) { + Vector vec = points.get(i).get(); + + Kluster cluster = new Kluster(vec, i, measure); + // add the center so the centroid will be correct upon output + cluster.observe(cluster.getCenter(), 1); + writer.append(new Text(cluster.getIdentifier()), cluster); + } + } finally { + Closeables.close(writer, false); + } + // now run the Job + Path outputPath = getTestTempDirPath("output" + k); + String[] args = {optKey(DefaultOptionCreator.INPUT_OPTION), pointsPath.toString(), + optKey(DefaultOptionCreator.CLUSTERS_IN_OPTION), clustersPath.toString(), + optKey(DefaultOptionCreator.OUTPUT_OPTION), outputPath.toString(), + optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION), EuclideanDistanceMeasure.class.getName(), + optKey(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION), "0.001", + optKey(DefaultOptionCreator.MAX_ITERATIONS_OPTION), "2", optKey(DefaultOptionCreator.CLUSTERING_OPTION), + optKey(DefaultOptionCreator.OVERWRITE_OPTION), optKey(DefaultOptionCreator.METHOD_OPTION), + DefaultOptionCreator.SEQUENTIAL_METHOD}; + ToolRunner.run(conf, new KMeansDriver(), args); + + // now compare the expected clusters with actual + Path clusteredPointsPath = new Path(outputPath, "clusteredPoints"); + int[] expect = EXPECTED_NUM_POINTS[k]; + DummyOutputCollector<IntWritable,WeightedPropertyVectorWritable> collector = new DummyOutputCollector<IntWritable,WeightedPropertyVectorWritable>(); + // The key is the clusterId, the value is the weighted vector + for (Pair<IntWritable,WeightedPropertyVectorWritable> record : new SequenceFileIterable<IntWritable,WeightedPropertyVectorWritable>( + new Path(clusteredPointsPath, "part-m-0"), conf)) { + collector.collect(record.getFirst(), record.getSecond()); + } + assertEquals("clusters[" + k + ']', expect.length, collector.getKeys().size()); + } + } + + /** Story: User wishes to run kmeans job on reference data */ + @Test + public void testKMeansMRJob() throws Exception { + DistanceMeasure measure = new EuclideanDistanceMeasure(); + List<VectorWritable> points = getPointsWritable(REFERENCE); + + Path pointsPath = getTestTempDirPath("points"); + Path clustersPath = getTestTempDirPath("clusters"); + Configuration conf = getConfiguration(); + ClusteringTestUtils.writePointsToFile(points, true, new Path(pointsPath, "file1"), fs, conf); + ClusteringTestUtils.writePointsToFile(points, true, new Path(pointsPath, "file2"), fs, conf); + for (int k = 1; k < points.size(); k += 3) { + System.out.println("testKMeansMRJob k= " + k); + // pick k initial cluster centers at random + Path path = new Path(clustersPath, "part-00000"); + FileSystem fs = FileSystem.get(path.toUri(), conf); + SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path, Text.class, Kluster.class); + + try { + for (int i = 0; i < k + 1; i++) { + Vector vec = points.get(i).get(); + + Kluster cluster = new Kluster(vec, i, measure); + // add the center so the centroid will be correct upon output + cluster.observe(cluster.getCenter(), 1); + writer.append(new Text(cluster.getIdentifier()), cluster); + } + } finally { + Closeables.close(writer, false); + } + // now run the Job + Path outputPath = getTestTempDirPath("output" + k); + String[] args = {optKey(DefaultOptionCreator.INPUT_OPTION), pointsPath.toString(), + optKey(DefaultOptionCreator.CLUSTERS_IN_OPTION), clustersPath.toString(), + optKey(DefaultOptionCreator.OUTPUT_OPTION), outputPath.toString(), + optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION), EuclideanDistanceMeasure.class.getName(), + optKey(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION), "0.001", + optKey(DefaultOptionCreator.MAX_ITERATIONS_OPTION), "2", optKey(DefaultOptionCreator.CLUSTERING_OPTION), + optKey(DefaultOptionCreator.OVERWRITE_OPTION)}; + ToolRunner.run(getConfiguration(), new KMeansDriver(), args); + + // now compare the expected clusters with actual + Path clusteredPointsPath = new Path(outputPath, "clusteredPoints"); + // assertEquals("output dir files?", 4, outFiles.length); + int[] expect = EXPECTED_NUM_POINTS[k]; + DummyOutputCollector<IntWritable,WeightedPropertyVectorWritable> collector = new DummyOutputCollector<IntWritable,WeightedPropertyVectorWritable>(); + // The key is the clusterId, the value is the weighted vector + for (Pair<IntWritable,WeightedPropertyVectorWritable> record : new SequenceFileIterable<IntWritable,WeightedPropertyVectorWritable>( + new Path(clusteredPointsPath, "part-m-00000"), conf)) { + collector.collect(record.getFirst(), record.getSecond()); + } + assertEquals("clusters[" + k + ']', expect.length, collector.getKeys().size()); + } + } + + /** + * Story: User wants to use canopy clustering to input the initial clusters + * for kmeans job. + */ + @Test + public void testKMeansWithCanopyClusterInput() throws Exception { + List<VectorWritable> points = getPointsWritable(REFERENCE); + + Path pointsPath = getTestTempDirPath("points"); + Configuration conf = getConfiguration(); + ClusteringTestUtils.writePointsToFile(points, true, new Path(pointsPath, "file1"), fs, conf); + ClusteringTestUtils.writePointsToFile(points, true, new Path(pointsPath, "file2"), fs, conf); + + Path outputPath = getTestTempDirPath("output"); + // now run the Canopy job + CanopyDriver.run(conf, pointsPath, outputPath, new ManhattanDistanceMeasure(), 3.1, 2.1, false, 0.0, false); + + DummyOutputCollector<Text, ClusterWritable> collector1 = + new DummyOutputCollector<Text, ClusterWritable>(); + + FileStatus[] outParts = FileSystem.get(conf).globStatus( + new Path(outputPath, "clusters-0-final/*-0*")); + for (FileStatus outPartStat : outParts) { + for (Pair<Text,ClusterWritable> record : + new SequenceFileIterable<Text,ClusterWritable>( + outPartStat.getPath(), conf)) { + collector1.collect(record.getFirst(), record.getSecond()); + } + } + + boolean got15 = false; + boolean got43 = false; + int count = 0; + for (Text k : collector1.getKeys()) { + count++; + List<ClusterWritable> vl = collector1.getValue(k); + assertEquals("non-singleton centroid!", 1, vl.size()); + ClusterWritable clusterWritable = vl.get(0); + Vector v = clusterWritable.getValue().getCenter(); + assertEquals("cetriod vector is wrong length", 2, v.size()); + if ( (Math.abs(v.get(0) - 1.5) < EPSILON) + && (Math.abs(v.get(1) - 1.5) < EPSILON) + && !got15) { + got15 = true; + } else if ( (Math.abs(v.get(0) - 4.333333333333334) < EPSILON) + && (Math.abs(v.get(1) - 4.333333333333334) < EPSILON) + && !got43) { + got43 = true; + } else { + fail("got unexpected center: " + v + " [" + v.getClass().toString() + ']'); + } + } + assertEquals("got unexpected number of centers", 2, count); + + // now run the KMeans job + Path kmeansOutput = new Path(outputPath, "kmeans"); + KMeansDriver.run(getConfiguration(), pointsPath, new Path(outputPath, "clusters-0-final"), kmeansOutput, + 0.001, 10, true, 0.0, false); + + // now compare the expected clusters with actual + Path clusteredPointsPath = new Path(kmeansOutput, "clusteredPoints"); + DummyOutputCollector<IntWritable,WeightedPropertyVectorWritable> collector = new DummyOutputCollector<IntWritable,WeightedPropertyVectorWritable>(); + + // The key is the clusterId, the value is the weighted vector + for (Pair<IntWritable,WeightedPropertyVectorWritable> record : new SequenceFileIterable<IntWritable,WeightedPropertyVectorWritable>( + new Path(clusteredPointsPath, "part-m-00000"), conf)) { + collector.collect(record.getFirst(), record.getSecond()); + } + + for (IntWritable k : collector.getKeys()) { + List<WeightedPropertyVectorWritable> wpvList = collector.getValue(k); + assertTrue("empty cluster!", !wpvList.isEmpty()); + if (wpvList.get(0).getVector().get(0) <= 2.0) { + for (WeightedPropertyVectorWritable wv : wpvList) { + Vector v = wv.getVector(); + int idx = v.maxValueIndex(); + assertTrue("bad cluster!", v.get(idx) <= 2.0); + } + assertEquals("Wrong size cluster", 4, wpvList.size()); + } else { + for (WeightedPropertyVectorWritable wv : wpvList) { + Vector v = wv.getVector(); + int idx = v.minValueIndex(); + assertTrue("bad cluster!", v.get(idx) > 2.0); + } + assertEquals("Wrong size cluster", 5, wpvList.size()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/test/java/org/apache/mahout/clustering/kmeans/TestRandomSeedGenerator.java ---------------------------------------------------------------------- diff --git a/mr/src/test/java/org/apache/mahout/clustering/kmeans/TestRandomSeedGenerator.java b/mr/src/test/java/org/apache/mahout/clustering/kmeans/TestRandomSeedGenerator.java new file mode 100644 index 0000000..5cb012a --- /dev/null +++ b/mr/src/test/java/org/apache/mahout/clustering/kmeans/TestRandomSeedGenerator.java @@ -0,0 +1,169 @@ + /** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.kmeans; + +import java.util.Collection; +import java.util.List; + +import com.google.common.collect.Sets; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; +import org.apache.mahout.clustering.Cluster; +import org.apache.mahout.clustering.ClusteringTestUtils; +import org.apache.mahout.clustering.iterator.ClusterWritable; +import org.apache.mahout.common.MahoutTestCase; +import org.apache.mahout.common.distance.ManhattanDistanceMeasure; +import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterable; +import org.apache.mahout.math.RandomAccessSparseVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.Lists; + +public final class TestRandomSeedGenerator extends MahoutTestCase { + + private static final double[][] RAW = {{1, 1}, {2, 1}, {1, 2}, {2, 2}, + {3, 3}, {4, 4}, {5, 4}, {4, 5}, {5, 5}}; + + private FileSystem fs; + + private static List<VectorWritable> getPoints() { + List<VectorWritable> points = Lists.newArrayList(); + for (double[] fr : RAW) { + Vector vec = new RandomAccessSparseVector(fr.length); + vec.assign(fr); + points.add(new VectorWritable(vec)); + } + return points; + } + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + Configuration conf = getConfiguration(); + fs = FileSystem.get(conf); + } + + /** Story: test random seed generation generates 4 clusters with proper ids and data */ + @Test + public void testRandomSeedGenerator() throws Exception { + List<VectorWritable> points = getPoints(); + Job job = new Job(); + Configuration conf = job.getConfiguration(); + job.setMapOutputValueClass(VectorWritable.class); + Path input = getTestTempFilePath("random-input"); + Path output = getTestTempDirPath("random-output"); + ClusteringTestUtils.writePointsToFile(points, input, fs, conf); + + RandomSeedGenerator.buildRandom(conf, input, output, 4, new ManhattanDistanceMeasure()); + + int clusterCount = 0; + Collection<Integer> set = Sets.newHashSet(); + for (ClusterWritable clusterWritable : + new SequenceFileValueIterable<ClusterWritable>(new Path(output, "part-randomSeed"), true, conf)) { + clusterCount++; + Cluster cluster = clusterWritable.getValue(); + int id = cluster.getId(); + assertTrue(set.add(id)); // Validate unique id's + + Vector v = cluster.getCenter(); + assertVectorEquals(RAW[id], v); // Validate values match + } + + assertEquals(4, clusterCount); // Validate sample count + } + + /** Be sure that the buildRandomSeeded works in the same way as RandomSeedGenerator.buildRandom */ + @Test + public void testRandomSeedGeneratorSeeded() throws Exception { + List<VectorWritable> points = getPoints(); + Job job = new Job(); + Configuration conf = job.getConfiguration(); + job.setMapOutputValueClass(VectorWritable.class); + Path input = getTestTempFilePath("random-input"); + Path output = getTestTempDirPath("random-output"); + ClusteringTestUtils.writePointsToFile(points, input, fs, conf); + + RandomSeedGenerator.buildRandom(conf, input, output, 4, new ManhattanDistanceMeasure(), 1L); + + int clusterCount = 0; + Collection<Integer> set = Sets.newHashSet(); + for (ClusterWritable clusterWritable : + new SequenceFileValueIterable<ClusterWritable>(new Path(output, "part-randomSeed"), true, conf)) { + clusterCount++; + Cluster cluster = clusterWritable.getValue(); + int id = cluster.getId(); + assertTrue(set.add(id)); // validate unique id's + + Vector v = cluster.getCenter(); + assertVectorEquals(RAW[id], v); // validate values match + } + + assertEquals(4, clusterCount); // validate sample count + } + + /** Test that initial clusters built with same random seed are reproduced */ + @Test + public void testBuildRandomSeededSameInitalClusters() throws Exception { + List<VectorWritable> points = getPoints(); + Job job = new Job(); + Configuration conf = job.getConfiguration(); + job.setMapOutputValueClass(VectorWritable.class); + Path input = getTestTempFilePath("random-input"); + Path output = getTestTempDirPath("random-output"); + ClusteringTestUtils.writePointsToFile(points, input, fs, conf); + long randSeed=1; + + RandomSeedGenerator.buildRandom(conf, input, output, 4, new ManhattanDistanceMeasure(), randSeed); + + int[] clusterIDSeq = new int[4]; + + /** run through all clusters once and set sequence of IDs */ + int clusterCount = 0; + for (ClusterWritable clusterWritable : + new SequenceFileValueIterable<ClusterWritable>(new Path(output, "part-randomSeed"), true, conf)) { + Cluster cluster = clusterWritable.getValue(); + clusterIDSeq[clusterCount] = cluster.getId(); + clusterCount++; + } + + /* Rebuild cluster and run through again making sure all IDs are in the same random sequence + * Needs a better test because in this case passes when seeded with 1 and 2 fails with 1, 3 + * passes when set to two */ + RandomSeedGenerator.buildRandom(conf, input, output, 4, new ManhattanDistanceMeasure(), randSeed); clusterCount = 0; + for (ClusterWritable clusterWritable : + new SequenceFileValueIterable<ClusterWritable>(new Path(output, "part-randomSeed"), true, conf)) { + Cluster cluster = clusterWritable.getValue(); + // Make sure cluster ids are in same random sequence + assertEquals(clusterIDSeq[clusterCount], cluster.getId()); + clusterCount++; + } + } + + private static void assertVectorEquals(double[] raw, Vector v) { + assertEquals(raw.length, v.size()); + for (int i = 0; i < raw.length; i++) { + assertEquals(raw[i], v.getQuick(i), EPSILON); + } + } +}
