http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java new file mode 100644 index 0000000..fc71ecf --- /dev/null +++ b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/fuzzykmeans/TestFuzzyKmeansClustering.java @@ -0,0 +1,202 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.fuzzykmeans; + +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.util.ToolRunner; +import org.apache.mahout.clustering.ClusteringTestUtils; +import org.apache.mahout.clustering.kmeans.TestKmeansClustering; +import org.apache.mahout.common.HadoopUtil; +import org.apache.mahout.common.MahoutTestCase; +import org.apache.mahout.common.commandline.DefaultOptionCreator; +import org.apache.mahout.common.distance.DistanceMeasure; +import org.apache.mahout.common.distance.EuclideanDistanceMeasure; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.io.Closeables; + +public final class TestFuzzyKmeansClustering extends MahoutTestCase { + + private FileSystem fs; + private final DistanceMeasure measure = new EuclideanDistanceMeasure(); + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + Configuration conf = getConfiguration(); + fs = FileSystem.get(conf); + } + + private static Vector tweakValue(Vector point) { + return point.plus(0.1); + } + + @Test + public void testFuzzyKMeansSeqJob() throws Exception { + List<VectorWritable> points = TestKmeansClustering.getPointsWritable(TestKmeansClustering.REFERENCE); + + Path pointsPath = getTestTempDirPath("points"); + Path clustersPath = getTestTempDirPath("clusters"); + Configuration conf = getConfiguration(); + ClusteringTestUtils.writePointsToFile(points, new Path(pointsPath, "file1"), fs, conf); + + for (int k = 0; k < points.size(); k++) { + System.out.println("testKFuzzyKMeansMRJob k= " + k); + // pick k initial cluster centers at random + SequenceFile.Writer writer = new SequenceFile.Writer(fs, + conf, + new Path(clustersPath, "part-00000"), + Text.class, + SoftCluster.class); + try { + for (int i = 0; i < k + 1; i++) { + Vector vec = tweakValue(points.get(i).get()); + SoftCluster cluster = new SoftCluster(vec, i, measure); + /* add the center so the centroid will be correct upon output */ + cluster.observe(cluster.getCenter(), 1); + // writer.write(cluster.getIdentifier() + '\t' + SoftCluster.formatCluster(cluster) + '\n'); + writer.append(new Text(cluster.getIdentifier()), cluster); + } + } finally { + Closeables.close(writer, false); + } + + // now run the Job using the run() command line options. + Path output = getTestTempDirPath("output" + k); + /* FuzzyKMeansDriver.runJob(pointsPath, + clustersPath, + output, + EuclideanDistanceMeasure.class.getName(), + 0.001, + 2, + k + 1, + 2, + false, + true, + 0); + */ + String[] args = { + optKey(DefaultOptionCreator.INPUT_OPTION), pointsPath.toString(), + optKey(DefaultOptionCreator.CLUSTERS_IN_OPTION), + clustersPath.toString(), + optKey(DefaultOptionCreator.OUTPUT_OPTION), + output.toString(), + optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION), + EuclideanDistanceMeasure.class.getName(), + optKey(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION), + "0.001", + optKey(DefaultOptionCreator.MAX_ITERATIONS_OPTION), + "2", + optKey(FuzzyKMeansDriver.M_OPTION), + "2.0", + optKey(DefaultOptionCreator.CLUSTERING_OPTION), + optKey(DefaultOptionCreator.EMIT_MOST_LIKELY_OPTION), + optKey(DefaultOptionCreator.OVERWRITE_OPTION), + optKey(DefaultOptionCreator.METHOD_OPTION), + DefaultOptionCreator.SEQUENTIAL_METHOD + }; + FuzzyKMeansDriver.main(args); + long count = HadoopUtil.countRecords(new Path(output, "clusteredPoints/part-m-0"), conf); + assertTrue(count > 0); + } + + } + + @Test + public void testFuzzyKMeansMRJob() throws Exception { + List<VectorWritable> points = TestKmeansClustering.getPointsWritable(TestKmeansClustering.REFERENCE); + + Path pointsPath = getTestTempDirPath("points"); + Path clustersPath = getTestTempDirPath("clusters"); + Configuration conf = getConfiguration(); + ClusteringTestUtils.writePointsToFile(points, new Path(pointsPath, "file1"), fs, conf); + + for (int k = 0; k < points.size(); k++) { + System.out.println("testKFuzzyKMeansMRJob k= " + k); + // pick k initial cluster centers at random + SequenceFile.Writer writer = new SequenceFile.Writer(fs, + conf, + new Path(clustersPath, "part-00000"), + Text.class, + SoftCluster.class); + try { + for (int i = 0; i < k + 1; i++) { + Vector vec = tweakValue(points.get(i).get()); + + SoftCluster cluster = new SoftCluster(vec, i, measure); + /* add the center so the centroid will be correct upon output */ + cluster.observe(cluster.getCenter(), 1); + // writer.write(cluster.getIdentifier() + '\t' + SoftCluster.formatCluster(cluster) + '\n'); + writer.append(new Text(cluster.getIdentifier()), cluster); + + } + } finally { + Closeables.close(writer, false); + } + + // now run the Job using the run() command line options. + Path output = getTestTempDirPath("output" + k); + /* FuzzyKMeansDriver.runJob(pointsPath, + clustersPath, + output, + EuclideanDistanceMeasure.class.getName(), + 0.001, + 2, + k + 1, + 2, + false, + true, + 0); + */ + String[] args = { + optKey(DefaultOptionCreator.INPUT_OPTION), + pointsPath.toString(), + optKey(DefaultOptionCreator.CLUSTERS_IN_OPTION), + clustersPath.toString(), + optKey(DefaultOptionCreator.OUTPUT_OPTION), + output.toString(), + optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION), + EuclideanDistanceMeasure.class.getName(), + optKey(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION), + "0.001", + optKey(DefaultOptionCreator.MAX_ITERATIONS_OPTION), + "2", + optKey(FuzzyKMeansDriver.M_OPTION), + "2.0", + optKey(DefaultOptionCreator.CLUSTERING_OPTION), + optKey(DefaultOptionCreator.EMIT_MOST_LIKELY_OPTION), + optKey(DefaultOptionCreator.OVERWRITE_OPTION) + }; + ToolRunner.run(getConfiguration(), new FuzzyKMeansDriver(), args); + long count = HadoopUtil.countRecords(new Path(output, "clusteredPoints/part-m-00000"), conf); + assertTrue(count > 0); + } + + } + +}
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/test/java/org/apache/mahout/clustering/iterator/TestClusterClassifier.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/test/java/org/apache/mahout/clustering/iterator/TestClusterClassifier.java b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/iterator/TestClusterClassifier.java new file mode 100644 index 0000000..fdcfd64 --- /dev/null +++ b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/iterator/TestClusterClassifier.java @@ -0,0 +1,238 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.iterator; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.mahout.clustering.AbstractCluster; +import org.apache.mahout.clustering.Cluster; +import org.apache.mahout.clustering.ClusteringTestUtils; +import org.apache.mahout.clustering.classify.ClusterClassifier; +import org.apache.mahout.clustering.fuzzykmeans.SoftCluster; +import org.apache.mahout.clustering.kmeans.TestKmeansClustering; +import org.apache.mahout.common.MahoutTestCase; +import org.apache.mahout.common.distance.CosineDistanceMeasure; +import org.apache.mahout.common.distance.DistanceMeasure; +import org.apache.mahout.common.distance.ManhattanDistanceMeasure; +import org.apache.mahout.math.DenseVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; +import org.junit.Test; + +import com.google.common.collect.Lists; + +public final class TestClusterClassifier extends MahoutTestCase { + + private static ClusterClassifier newDMClassifier() { + List<Cluster> models = Lists.newArrayList(); + DistanceMeasure measure = new ManhattanDistanceMeasure(); + models.add(new DistanceMeasureCluster(new DenseVector(2).assign(1), 0, measure)); + models.add(new DistanceMeasureCluster(new DenseVector(2), 1, measure)); + models.add(new DistanceMeasureCluster(new DenseVector(2).assign(-1), 2, measure)); + return new ClusterClassifier(models, new KMeansClusteringPolicy()); + } + + private static ClusterClassifier newKlusterClassifier() { + List<Cluster> models = Lists.newArrayList(); + DistanceMeasure measure = new ManhattanDistanceMeasure(); + models.add(new org.apache.mahout.clustering.kmeans.Kluster(new DenseVector(2).assign(1), 0, measure)); + models.add(new org.apache.mahout.clustering.kmeans.Kluster(new DenseVector(2), 1, measure)); + models.add(new org.apache.mahout.clustering.kmeans.Kluster(new DenseVector(2).assign(-1), 2, measure)); + return new ClusterClassifier(models, new KMeansClusteringPolicy()); + } + + private static ClusterClassifier newCosineKlusterClassifier() { + List<Cluster> models = Lists.newArrayList(); + DistanceMeasure measure = new CosineDistanceMeasure(); + models.add(new org.apache.mahout.clustering.kmeans.Kluster(new DenseVector(2).assign(1), 0, measure)); + models.add(new org.apache.mahout.clustering.kmeans.Kluster(new DenseVector(2), 1, measure)); + models.add(new org.apache.mahout.clustering.kmeans.Kluster(new DenseVector(2).assign(-1), 2, measure)); + return new ClusterClassifier(models, new KMeansClusteringPolicy()); + } + + private static ClusterClassifier newSoftClusterClassifier() { + List<Cluster> models = Lists.newArrayList(); + DistanceMeasure measure = new ManhattanDistanceMeasure(); + models.add(new SoftCluster(new DenseVector(2).assign(1), 0, measure)); + models.add(new SoftCluster(new DenseVector(2), 1, measure)); + models.add(new SoftCluster(new DenseVector(2).assign(-1), 2, measure)); + return new ClusterClassifier(models, new FuzzyKMeansClusteringPolicy()); + } + + private ClusterClassifier writeAndRead(ClusterClassifier classifier) throws IOException { + Path path = new Path(getTestTempDirPath(), "output"); + classifier.writeToSeqFiles(path); + ClusterClassifier newClassifier = new ClusterClassifier(); + newClassifier.readFromSeqFiles(getConfiguration(), path); + return newClassifier; + } + + @Test + public void testDMClusterClassification() { + ClusterClassifier classifier = newDMClassifier(); + Vector pdf = classifier.classify(new DenseVector(2)); + assertEquals("[0,0]", "[0.2,0.6,0.2]", AbstractCluster.formatVector(pdf, null)); + pdf = classifier.classify(new DenseVector(2).assign(2)); + assertEquals("[2,2]", "[0.493,0.296,0.211]", AbstractCluster.formatVector(pdf, null)); + } + + @Test + public void testClusterClassification() { + ClusterClassifier classifier = newKlusterClassifier(); + Vector pdf = classifier.classify(new DenseVector(2)); + assertEquals("[0,0]", "[0.2,0.6,0.2]", AbstractCluster.formatVector(pdf, null)); + pdf = classifier.classify(new DenseVector(2).assign(2)); + assertEquals("[2,2]", "[0.493,0.296,0.211]", AbstractCluster.formatVector(pdf, null)); + } + + @Test + public void testSoftClusterClassification() { + ClusterClassifier classifier = newSoftClusterClassifier(); + Vector pdf = classifier.classify(new DenseVector(2)); + assertEquals("[0,0]", "[0.0,1.0,0.0]", AbstractCluster.formatVector(pdf, null)); + pdf = classifier.classify(new DenseVector(2).assign(2)); + assertEquals("[2,2]", "[0.735,0.184,0.082]", AbstractCluster.formatVector(pdf, null)); + } + + @Test + public void testDMClassifierSerialization() throws Exception { + ClusterClassifier classifier = newDMClassifier(); + ClusterClassifier classifierOut = writeAndRead(classifier); + assertEquals(classifier.getModels().size(), classifierOut.getModels().size()); + assertEquals(classifier.getModels().get(0).getClass().getName(), classifierOut.getModels().get(0).getClass() + .getName()); + } + + @Test + public void testClusterClassifierSerialization() throws Exception { + ClusterClassifier classifier = newKlusterClassifier(); + ClusterClassifier classifierOut = writeAndRead(classifier); + assertEquals(classifier.getModels().size(), classifierOut.getModels().size()); + assertEquals(classifier.getModels().get(0).getClass().getName(), classifierOut.getModels().get(0).getClass() + .getName()); + } + + @Test + public void testSoftClusterClassifierSerialization() throws Exception { + ClusterClassifier classifier = newSoftClusterClassifier(); + ClusterClassifier classifierOut = writeAndRead(classifier); + assertEquals(classifier.getModels().size(), classifierOut.getModels().size()); + assertEquals(classifier.getModels().get(0).getClass().getName(), classifierOut.getModels().get(0).getClass() + .getName()); + } + + @Test + public void testClusterIteratorKMeans() { + List<Vector> data = TestKmeansClustering.getPoints(TestKmeansClustering.REFERENCE); + ClusterClassifier prior = newKlusterClassifier(); + ClusterClassifier posterior = ClusterIterator.iterate(data, prior, 5); + assertEquals(3, posterior.getModels().size()); + for (Cluster cluster : posterior.getModels()) { + System.out.println(cluster.asFormatString(null)); + } + } + + @Test + public void testClusterIteratorDirichlet() { + List<Vector> data = TestKmeansClustering.getPoints(TestKmeansClustering.REFERENCE); + ClusterClassifier prior = newKlusterClassifier(); + ClusterClassifier posterior = ClusterIterator.iterate(data, prior, 5); + assertEquals(3, posterior.getModels().size()); + for (Cluster cluster : posterior.getModels()) { + System.out.println(cluster.asFormatString(null)); + } + } + + @Test + public void testSeqFileClusterIteratorKMeans() throws IOException { + Path pointsPath = getTestTempDirPath("points"); + Path priorPath = getTestTempDirPath("prior"); + Path outPath = getTestTempDirPath("output"); + Configuration conf = getConfiguration(); + FileSystem fs = FileSystem.get(pointsPath.toUri(), conf); + List<VectorWritable> points = TestKmeansClustering.getPointsWritable(TestKmeansClustering.REFERENCE); + ClusteringTestUtils.writePointsToFile(points, new Path(pointsPath, "file1"), fs, conf); + Path path = new Path(priorPath, "priorClassifier"); + ClusterClassifier prior = newKlusterClassifier(); + prior.writeToSeqFiles(path); + assertEquals(3, prior.getModels().size()); + System.out.println("Prior"); + for (Cluster cluster : prior.getModels()) { + System.out.println(cluster.asFormatString(null)); + } + ClusterIterator.iterateSeq(conf, pointsPath, path, outPath, 5); + + for (int i = 1; i <= 4; i++) { + System.out.println("Classifier-" + i); + ClusterClassifier posterior = new ClusterClassifier(); + String name = i == 4 ? "clusters-4-final" : "clusters-" + i; + posterior.readFromSeqFiles(conf, new Path(outPath, name)); + assertEquals(3, posterior.getModels().size()); + for (Cluster cluster : posterior.getModels()) { + System.out.println(cluster.asFormatString(null)); + } + + } + } + + @Test + public void testMRFileClusterIteratorKMeans() throws Exception { + Path pointsPath = getTestTempDirPath("points"); + Path priorPath = getTestTempDirPath("prior"); + Path outPath = getTestTempDirPath("output"); + Configuration conf = getConfiguration(); + FileSystem fs = FileSystem.get(pointsPath.toUri(), conf); + List<VectorWritable> points = TestKmeansClustering.getPointsWritable(TestKmeansClustering.REFERENCE); + ClusteringTestUtils.writePointsToFile(points, new Path(pointsPath, "file1"), fs, conf); + Path path = new Path(priorPath, "priorClassifier"); + ClusterClassifier prior = newKlusterClassifier(); + prior.writeToSeqFiles(path); + ClusteringPolicy policy = new KMeansClusteringPolicy(); + ClusterClassifier.writePolicy(policy, path); + assertEquals(3, prior.getModels().size()); + System.out.println("Prior"); + for (Cluster cluster : prior.getModels()) { + System.out.println(cluster.asFormatString(null)); + } + ClusterIterator.iterateMR(conf, pointsPath, path, outPath, 5); + + for (int i = 1; i <= 4; i++) { + System.out.println("Classifier-" + i); + ClusterClassifier posterior = new ClusterClassifier(); + String name = i == 4 ? "clusters-4-final" : "clusters-" + i; + posterior.readFromSeqFiles(conf, new Path(outPath, name)); + assertEquals(3, posterior.getModels().size()); + for (Cluster cluster : posterior.getModels()) { + System.out.println(cluster.asFormatString(null)); + } + } + } + + @Test + public void testCosineKlusterClassification() { + ClusterClassifier classifier = newCosineKlusterClassifier(); + Vector pdf = classifier.classify(new DenseVector(2)); + assertEquals("[0,0]", "[0.333,0.333,0.333]", AbstractCluster.formatVector(pdf, null)); + pdf = classifier.classify(new DenseVector(2).assign(2)); + assertEquals("[2,2]", "[0.429,0.429,0.143]", AbstractCluster.formatVector(pdf, null)); + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java new file mode 100644 index 0000000..94762e3 --- /dev/null +++ b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/kmeans/TestKmeansClustering.java @@ -0,0 +1,385 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.kmeans; + +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.util.ToolRunner; +import org.apache.mahout.clustering.ClusteringTestUtils; +import org.apache.mahout.clustering.canopy.CanopyDriver; +import org.apache.mahout.clustering.classify.WeightedPropertyVectorWritable; +import org.apache.mahout.clustering.iterator.ClusterWritable; +import org.apache.mahout.common.DummyOutputCollector; +import org.apache.mahout.common.MahoutTestCase; +import org.apache.mahout.common.Pair; +import org.apache.mahout.common.commandline.DefaultOptionCreator; +import org.apache.mahout.common.distance.DistanceMeasure; +import org.apache.mahout.common.distance.EuclideanDistanceMeasure; +import org.apache.mahout.common.distance.ManhattanDistanceMeasure; +import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable; +import org.apache.mahout.math.DenseVector; +import org.apache.mahout.math.RandomAccessSparseVector; +import org.apache.mahout.math.SequentialAccessSparseVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.Lists; +import com.google.common.io.Closeables; + +public final class TestKmeansClustering extends MahoutTestCase { + + public static final double[][] REFERENCE = { {1, 1}, {2, 1}, {1, 2}, {2, 2}, {3, 3}, {4, 4}, {5, 4}, {4, 5}, {5, 5}}; + + private static final int[][] EXPECTED_NUM_POINTS = { {9}, {4, 5}, {4, 4, 1}, {1, 2, 1, 5}, {1, 1, 1, 2, 4}, + {1, 1, 1, 1, 1, 4}, {1, 1, 1, 1, 1, 2, 2}, {1, 1, 1, 1, 1, 1, 2, 1}, {1, 1, 1, 1, 1, 1, 1, 1, 1}}; + + private FileSystem fs; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + Configuration conf = getConfiguration(); + fs = FileSystem.get(conf); + } + + public static List<VectorWritable> getPointsWritable(double[][] raw) { + List<VectorWritable> points = Lists.newArrayList(); + for (double[] fr : raw) { + Vector vec = new RandomAccessSparseVector(fr.length); + vec.assign(fr); + points.add(new VectorWritable(vec)); + } + return points; + } + + public static List<VectorWritable> getPointsWritableDenseVector(double[][] raw) { + List<VectorWritable> points = Lists.newArrayList(); + for (double[] fr : raw) { + Vector vec = new DenseVector(fr.length); + vec.assign(fr); + points.add(new VectorWritable(vec)); + } + return points; + } + + public static List<Vector> getPoints(double[][] raw) { + List<Vector> points = Lists.newArrayList(); + for (double[] fr : raw) { + Vector vec = new SequentialAccessSparseVector(fr.length); + vec.assign(fr); + points.add(vec); + } + return points; + } + + /** + * Tests + * {@link KMeansClusterer#runKMeansIteration(Iterable, Iterable, DistanceMeasure, double)} + * ) single run convergence with a given distance threshold. + */ + /*@Test + public void testRunKMeansIterationConvergesInOneRunWithGivenDistanceThreshold() { + double[][] rawPoints = { {0, 0}, {0, 0.25}, {0, 0.75}, {0, 1}}; + List<Vector> points = getPoints(rawPoints); + + ManhattanDistanceMeasure distanceMeasure = new ManhattanDistanceMeasure(); + List<Kluster> clusters = Arrays.asList(new Kluster(points.get(0), 0, distanceMeasure), new Kluster(points.get(3), + 3, distanceMeasure)); + + // To converge in a single run, the given distance threshold should be + // greater than or equal to 0.125, + // since 0.125 will be the distance between center and centroid for the + // initial two clusters after one run. + double distanceThreshold = 0.25; + + boolean converged = KMeansClusterer.runKMeansIteration(points, clusters, distanceMeasure, distanceThreshold); + + Vector cluster1Center = clusters.get(0).getCenter(); + assertEquals(0, cluster1Center.get(0), EPSILON); + assertEquals(0.125, cluster1Center.get(1), EPSILON); + + Vector cluster2Center = clusters.get(1).getCenter(); + assertEquals(0, cluster2Center.get(0), EPSILON); + assertEquals(0.875, cluster2Center.get(1), EPSILON); + + assertTrue("KMeans iteration should be converged after a single run", converged); + }*/ + + /** Story: User wishes to run kmeans job on reference data */ + @Test + public void testKMeansSeqJob() throws Exception { + DistanceMeasure measure = new EuclideanDistanceMeasure(); + List<VectorWritable> points = getPointsWritable(REFERENCE); + + Path pointsPath = getTestTempDirPath("points"); + Path clustersPath = getTestTempDirPath("clusters"); + Configuration conf = getConfiguration(); + ClusteringTestUtils.writePointsToFile(points, true, new Path(pointsPath, "file1"), fs, conf); + ClusteringTestUtils.writePointsToFile(points, true, new Path(pointsPath, "file2"), fs, conf); + for (int k = 1; k < points.size(); k++) { + System.out.println("testKMeansMRJob k= " + k); + // pick k initial cluster centers at random + Path path = new Path(clustersPath, "part-00000"); + FileSystem fs = FileSystem.get(path.toUri(), conf); + SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path, Text.class, Kluster.class); + try { + for (int i = 0; i < k + 1; i++) { + Vector vec = points.get(i).get(); + + Kluster cluster = new Kluster(vec, i, measure); + // add the center so the centroid will be correct upon output + cluster.observe(cluster.getCenter(), 1); + writer.append(new Text(cluster.getIdentifier()), cluster); + } + } finally { + Closeables.close(writer, false); + } + // now run the Job + Path outputPath = getTestTempDirPath("output" + k); + String[] args = {optKey(DefaultOptionCreator.INPUT_OPTION), pointsPath.toString(), + optKey(DefaultOptionCreator.CLUSTERS_IN_OPTION), clustersPath.toString(), + optKey(DefaultOptionCreator.OUTPUT_OPTION), outputPath.toString(), + optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION), EuclideanDistanceMeasure.class.getName(), + optKey(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION), "0.001", + optKey(DefaultOptionCreator.MAX_ITERATIONS_OPTION), "2", optKey(DefaultOptionCreator.CLUSTERING_OPTION), + optKey(DefaultOptionCreator.OVERWRITE_OPTION), optKey(DefaultOptionCreator.METHOD_OPTION), + DefaultOptionCreator.SEQUENTIAL_METHOD}; + ToolRunner.run(conf, new KMeansDriver(), args); + + // now compare the expected clusters with actual + Path clusteredPointsPath = new Path(outputPath, "clusteredPoints"); + int[] expect = EXPECTED_NUM_POINTS[k]; + DummyOutputCollector<IntWritable,WeightedPropertyVectorWritable> collector = new DummyOutputCollector<>(); + // The key is the clusterId, the value is the weighted vector + for (Pair<IntWritable,WeightedPropertyVectorWritable> record : new SequenceFileIterable<IntWritable,WeightedPropertyVectorWritable>( + new Path(clusteredPointsPath, "part-m-0"), conf)) { + collector.collect(record.getFirst(), record.getSecond()); + } + assertEquals("clusters[" + k + ']', expect.length, collector.getKeys().size()); + } + } + + /** Story: User wishes to run kmeans job on reference data (DenseVector test) */ + @Test + public void testKMeansSeqJobDenseVector() throws Exception { + DistanceMeasure measure = new EuclideanDistanceMeasure(); + List<VectorWritable> points = getPointsWritableDenseVector(REFERENCE); + + Path pointsPath = getTestTempDirPath("points"); + Path clustersPath = getTestTempDirPath("clusters"); + Configuration conf = getConfiguration(); + ClusteringTestUtils.writePointsToFile(points, true, new Path(pointsPath, "file1"), fs, conf); + ClusteringTestUtils.writePointsToFile(points, true, new Path(pointsPath, "file2"), fs, conf); + for (int k = 1; k < points.size(); k++) { + System.out.println("testKMeansMRJob k= " + k); + // pick k initial cluster centers at random + Path path = new Path(clustersPath, "part-00000"); + FileSystem fs = FileSystem.get(path.toUri(), conf); + SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path, Text.class, Kluster.class); + try { + for (int i = 0; i < k + 1; i++) { + Vector vec = points.get(i).get(); + + Kluster cluster = new Kluster(vec, i, measure); + // add the center so the centroid will be correct upon output + cluster.observe(cluster.getCenter(), 1); + writer.append(new Text(cluster.getIdentifier()), cluster); + } + } finally { + Closeables.close(writer, false); + } + // now run the Job + Path outputPath = getTestTempDirPath("output" + k); + String[] args = {optKey(DefaultOptionCreator.INPUT_OPTION), pointsPath.toString(), + optKey(DefaultOptionCreator.CLUSTERS_IN_OPTION), clustersPath.toString(), + optKey(DefaultOptionCreator.OUTPUT_OPTION), outputPath.toString(), + optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION), EuclideanDistanceMeasure.class.getName(), + optKey(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION), "0.001", + optKey(DefaultOptionCreator.MAX_ITERATIONS_OPTION), "2", optKey(DefaultOptionCreator.CLUSTERING_OPTION), + optKey(DefaultOptionCreator.OVERWRITE_OPTION), optKey(DefaultOptionCreator.METHOD_OPTION), + DefaultOptionCreator.SEQUENTIAL_METHOD}; + ToolRunner.run(conf, new KMeansDriver(), args); + + // now compare the expected clusters with actual + Path clusteredPointsPath = new Path(outputPath, "clusteredPoints"); + int[] expect = EXPECTED_NUM_POINTS[k]; + DummyOutputCollector<IntWritable,WeightedPropertyVectorWritable> collector = new DummyOutputCollector<>(); + // The key is the clusterId, the value is the weighted vector + for (Pair<IntWritable,WeightedPropertyVectorWritable> record : new SequenceFileIterable<IntWritable,WeightedPropertyVectorWritable>( + new Path(clusteredPointsPath, "part-m-0"), conf)) { + collector.collect(record.getFirst(), record.getSecond()); + } + assertEquals("clusters[" + k + ']', expect.length, collector.getKeys().size()); + } + } + + /** Story: User wishes to run kmeans job on reference data */ + @Test + public void testKMeansMRJob() throws Exception { + DistanceMeasure measure = new EuclideanDistanceMeasure(); + List<VectorWritable> points = getPointsWritable(REFERENCE); + + Path pointsPath = getTestTempDirPath("points"); + Path clustersPath = getTestTempDirPath("clusters"); + Configuration conf = getConfiguration(); + ClusteringTestUtils.writePointsToFile(points, true, new Path(pointsPath, "file1"), fs, conf); + ClusteringTestUtils.writePointsToFile(points, true, new Path(pointsPath, "file2"), fs, conf); + for (int k = 1; k < points.size(); k += 3) { + System.out.println("testKMeansMRJob k= " + k); + // pick k initial cluster centers at random + Path path = new Path(clustersPath, "part-00000"); + FileSystem fs = FileSystem.get(path.toUri(), conf); + SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path, Text.class, Kluster.class); + + try { + for (int i = 0; i < k + 1; i++) { + Vector vec = points.get(i).get(); + + Kluster cluster = new Kluster(vec, i, measure); + // add the center so the centroid will be correct upon output + cluster.observe(cluster.getCenter(), 1); + writer.append(new Text(cluster.getIdentifier()), cluster); + } + } finally { + Closeables.close(writer, false); + } + // now run the Job + Path outputPath = getTestTempDirPath("output" + k); + String[] args = {optKey(DefaultOptionCreator.INPUT_OPTION), pointsPath.toString(), + optKey(DefaultOptionCreator.CLUSTERS_IN_OPTION), clustersPath.toString(), + optKey(DefaultOptionCreator.OUTPUT_OPTION), outputPath.toString(), + optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION), EuclideanDistanceMeasure.class.getName(), + optKey(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION), "0.001", + optKey(DefaultOptionCreator.MAX_ITERATIONS_OPTION), "2", optKey(DefaultOptionCreator.CLUSTERING_OPTION), + optKey(DefaultOptionCreator.OVERWRITE_OPTION)}; + ToolRunner.run(getConfiguration(), new KMeansDriver(), args); + + // now compare the expected clusters with actual + Path clusteredPointsPath = new Path(outputPath, "clusteredPoints"); + // assertEquals("output dir files?", 4, outFiles.length); + int[] expect = EXPECTED_NUM_POINTS[k]; + DummyOutputCollector<IntWritable,WeightedPropertyVectorWritable> collector = new DummyOutputCollector<>(); + // The key is the clusterId, the value is the weighted vector + for (Pair<IntWritable,WeightedPropertyVectorWritable> record : new SequenceFileIterable<IntWritable,WeightedPropertyVectorWritable>( + new Path(clusteredPointsPath, "part-m-00000"), conf)) { + collector.collect(record.getFirst(), record.getSecond()); + } + assertEquals("clusters[" + k + ']', expect.length, collector.getKeys().size()); + } + } + + /** + * Story: User wants to use canopy clustering to input the initial clusters + * for kmeans job. + */ + @Test + public void testKMeansWithCanopyClusterInput() throws Exception { + List<VectorWritable> points = getPointsWritable(REFERENCE); + + Path pointsPath = getTestTempDirPath("points"); + Configuration conf = getConfiguration(); + ClusteringTestUtils.writePointsToFile(points, true, new Path(pointsPath, "file1"), fs, conf); + ClusteringTestUtils.writePointsToFile(points, true, new Path(pointsPath, "file2"), fs, conf); + + Path outputPath = getTestTempDirPath("output"); + // now run the Canopy job + CanopyDriver.run(conf, pointsPath, outputPath, new ManhattanDistanceMeasure(), 3.1, 2.1, false, 0.0, false); + + DummyOutputCollector<Text, ClusterWritable> collector1 = + new DummyOutputCollector<>(); + + FileStatus[] outParts = FileSystem.get(conf).globStatus( + new Path(outputPath, "clusters-0-final/*-0*")); + for (FileStatus outPartStat : outParts) { + for (Pair<Text,ClusterWritable> record : + new SequenceFileIterable<Text,ClusterWritable>( + outPartStat.getPath(), conf)) { + collector1.collect(record.getFirst(), record.getSecond()); + } + } + + boolean got15 = false; + boolean got43 = false; + int count = 0; + for (Text k : collector1.getKeys()) { + count++; + List<ClusterWritable> vl = collector1.getValue(k); + assertEquals("non-singleton centroid!", 1, vl.size()); + ClusterWritable clusterWritable = vl.get(0); + Vector v = clusterWritable.getValue().getCenter(); + assertEquals("cetriod vector is wrong length", 2, v.size()); + if ( (Math.abs(v.get(0) - 1.5) < EPSILON) + && (Math.abs(v.get(1) - 1.5) < EPSILON) + && !got15) { + got15 = true; + } else if ( (Math.abs(v.get(0) - 4.333333333333334) < EPSILON) + && (Math.abs(v.get(1) - 4.333333333333334) < EPSILON) + && !got43) { + got43 = true; + } else { + fail("got unexpected center: " + v + " [" + v.getClass().toString() + ']'); + } + } + assertEquals("got unexpected number of centers", 2, count); + + // now run the KMeans job + Path kmeansOutput = new Path(outputPath, "kmeans"); + KMeansDriver.run(getConfiguration(), pointsPath, new Path(outputPath, "clusters-0-final"), kmeansOutput, + 0.001, 10, true, 0.0, false); + + // now compare the expected clusters with actual + Path clusteredPointsPath = new Path(kmeansOutput, "clusteredPoints"); + DummyOutputCollector<IntWritable,WeightedPropertyVectorWritable> collector = new DummyOutputCollector<>(); + + // The key is the clusterId, the value is the weighted vector + for (Pair<IntWritable,WeightedPropertyVectorWritable> record : new SequenceFileIterable<IntWritable,WeightedPropertyVectorWritable>( + new Path(clusteredPointsPath, "part-m-00000"), conf)) { + collector.collect(record.getFirst(), record.getSecond()); + } + + for (IntWritable k : collector.getKeys()) { + List<WeightedPropertyVectorWritable> wpvList = collector.getValue(k); + assertTrue("empty cluster!", !wpvList.isEmpty()); + if (wpvList.get(0).getVector().get(0) <= 2.0) { + for (WeightedPropertyVectorWritable wv : wpvList) { + Vector v = wv.getVector(); + int idx = v.maxValueIndex(); + assertTrue("bad cluster!", v.get(idx) <= 2.0); + } + assertEquals("Wrong size cluster", 4, wpvList.size()); + } else { + for (WeightedPropertyVectorWritable wv : wpvList) { + Vector v = wv.getVector(); + int idx = v.minValueIndex(); + assertTrue("bad cluster!", v.get(idx) > 2.0); + } + assertEquals("Wrong size cluster", 5, wpvList.size()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/test/java/org/apache/mahout/clustering/kmeans/TestRandomSeedGenerator.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/test/java/org/apache/mahout/clustering/kmeans/TestRandomSeedGenerator.java b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/kmeans/TestRandomSeedGenerator.java new file mode 100644 index 0000000..5cb012a --- /dev/null +++ b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/kmeans/TestRandomSeedGenerator.java @@ -0,0 +1,169 @@ + /** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.kmeans; + +import java.util.Collection; +import java.util.List; + +import com.google.common.collect.Sets; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; +import org.apache.mahout.clustering.Cluster; +import org.apache.mahout.clustering.ClusteringTestUtils; +import org.apache.mahout.clustering.iterator.ClusterWritable; +import org.apache.mahout.common.MahoutTestCase; +import org.apache.mahout.common.distance.ManhattanDistanceMeasure; +import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterable; +import org.apache.mahout.math.RandomAccessSparseVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.Lists; + +public final class TestRandomSeedGenerator extends MahoutTestCase { + + private static final double[][] RAW = {{1, 1}, {2, 1}, {1, 2}, {2, 2}, + {3, 3}, {4, 4}, {5, 4}, {4, 5}, {5, 5}}; + + private FileSystem fs; + + private static List<VectorWritable> getPoints() { + List<VectorWritable> points = Lists.newArrayList(); + for (double[] fr : RAW) { + Vector vec = new RandomAccessSparseVector(fr.length); + vec.assign(fr); + points.add(new VectorWritable(vec)); + } + return points; + } + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + Configuration conf = getConfiguration(); + fs = FileSystem.get(conf); + } + + /** Story: test random seed generation generates 4 clusters with proper ids and data */ + @Test + public void testRandomSeedGenerator() throws Exception { + List<VectorWritable> points = getPoints(); + Job job = new Job(); + Configuration conf = job.getConfiguration(); + job.setMapOutputValueClass(VectorWritable.class); + Path input = getTestTempFilePath("random-input"); + Path output = getTestTempDirPath("random-output"); + ClusteringTestUtils.writePointsToFile(points, input, fs, conf); + + RandomSeedGenerator.buildRandom(conf, input, output, 4, new ManhattanDistanceMeasure()); + + int clusterCount = 0; + Collection<Integer> set = Sets.newHashSet(); + for (ClusterWritable clusterWritable : + new SequenceFileValueIterable<ClusterWritable>(new Path(output, "part-randomSeed"), true, conf)) { + clusterCount++; + Cluster cluster = clusterWritable.getValue(); + int id = cluster.getId(); + assertTrue(set.add(id)); // Validate unique id's + + Vector v = cluster.getCenter(); + assertVectorEquals(RAW[id], v); // Validate values match + } + + assertEquals(4, clusterCount); // Validate sample count + } + + /** Be sure that the buildRandomSeeded works in the same way as RandomSeedGenerator.buildRandom */ + @Test + public void testRandomSeedGeneratorSeeded() throws Exception { + List<VectorWritable> points = getPoints(); + Job job = new Job(); + Configuration conf = job.getConfiguration(); + job.setMapOutputValueClass(VectorWritable.class); + Path input = getTestTempFilePath("random-input"); + Path output = getTestTempDirPath("random-output"); + ClusteringTestUtils.writePointsToFile(points, input, fs, conf); + + RandomSeedGenerator.buildRandom(conf, input, output, 4, new ManhattanDistanceMeasure(), 1L); + + int clusterCount = 0; + Collection<Integer> set = Sets.newHashSet(); + for (ClusterWritable clusterWritable : + new SequenceFileValueIterable<ClusterWritable>(new Path(output, "part-randomSeed"), true, conf)) { + clusterCount++; + Cluster cluster = clusterWritable.getValue(); + int id = cluster.getId(); + assertTrue(set.add(id)); // validate unique id's + + Vector v = cluster.getCenter(); + assertVectorEquals(RAW[id], v); // validate values match + } + + assertEquals(4, clusterCount); // validate sample count + } + + /** Test that initial clusters built with same random seed are reproduced */ + @Test + public void testBuildRandomSeededSameInitalClusters() throws Exception { + List<VectorWritable> points = getPoints(); + Job job = new Job(); + Configuration conf = job.getConfiguration(); + job.setMapOutputValueClass(VectorWritable.class); + Path input = getTestTempFilePath("random-input"); + Path output = getTestTempDirPath("random-output"); + ClusteringTestUtils.writePointsToFile(points, input, fs, conf); + long randSeed=1; + + RandomSeedGenerator.buildRandom(conf, input, output, 4, new ManhattanDistanceMeasure(), randSeed); + + int[] clusterIDSeq = new int[4]; + + /** run through all clusters once and set sequence of IDs */ + int clusterCount = 0; + for (ClusterWritable clusterWritable : + new SequenceFileValueIterable<ClusterWritable>(new Path(output, "part-randomSeed"), true, conf)) { + Cluster cluster = clusterWritable.getValue(); + clusterIDSeq[clusterCount] = cluster.getId(); + clusterCount++; + } + + /* Rebuild cluster and run through again making sure all IDs are in the same random sequence + * Needs a better test because in this case passes when seeded with 1 and 2 fails with 1, 3 + * passes when set to two */ + RandomSeedGenerator.buildRandom(conf, input, output, 4, new ManhattanDistanceMeasure(), randSeed); clusterCount = 0; + for (ClusterWritable clusterWritable : + new SequenceFileValueIterable<ClusterWritable>(new Path(output, "part-randomSeed"), true, conf)) { + Cluster cluster = clusterWritable.getValue(); + // Make sure cluster ids are in same random sequence + assertEquals(clusterIDSeq[clusterCount], cluster.getId()); + clusterCount++; + } + } + + private static void assertVectorEquals(double[] raw, Vector v) { + assertEquals(raw.length, v.size()); + for (int i = 0; i < raw.length; i++) { + assertEquals(raw[i], v.getQuick(i), EPSILON); + } + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/test/java/org/apache/mahout/clustering/lda/cvb/TestCVBModelTrainer.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/test/java/org/apache/mahout/clustering/lda/cvb/TestCVBModelTrainer.java b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/lda/cvb/TestCVBModelTrainer.java new file mode 100644 index 0000000..dd4360a --- /dev/null +++ b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/lda/cvb/TestCVBModelTrainer.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.mahout.clustering.lda.cvb; + +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.mahout.clustering.ClusteringTestUtils; +import org.apache.mahout.common.MahoutTestCase; +import org.apache.mahout.common.RandomUtils; +import org.apache.mahout.math.Matrix; +import org.apache.mahout.math.MatrixUtils; +import org.apache.mahout.math.function.DoubleFunction; +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +public final class TestCVBModelTrainer extends MahoutTestCase { + + private static final double ETA = 0.1; + private static final double ALPHA = 0.1; + + @Test + public void testInMemoryCVB0() throws Exception { + String[] terms = new String[26]; + for (int i=0; i<terms.length; i++) { + terms[i] = String.valueOf((char) (i + 'a')); + } + int numGeneratingTopics = 3; + int numTerms = 26; + Matrix matrix = ClusteringTestUtils.randomStructuredModel(numGeneratingTopics, numTerms, new DoubleFunction() { + @Override public double apply(double d) { + return 1.0 / Math.pow(d + 1.0, 2); + } + }); + + int numDocs = 100; + int numSamples = 20; + int numTopicsPerDoc = 1; + + Matrix sampledCorpus = ClusteringTestUtils.sampledCorpus(matrix, RandomUtils.getRandom(), + numDocs, numSamples, numTopicsPerDoc); + + List<Double> perplexities = Lists.newArrayList(); + int numTrials = 1; + for (int numTestTopics = 1; numTestTopics < 2 * numGeneratingTopics; numTestTopics++) { + double[] perps = new double[numTrials]; + for (int trial = 0; trial < numTrials; trial++) { + InMemoryCollapsedVariationalBayes0 cvb = + new InMemoryCollapsedVariationalBayes0(sampledCorpus, terms, numTestTopics, ALPHA, ETA, 2, 1, 0); + cvb.setVerbose(true); + perps[trial] = cvb.iterateUntilConvergence(0, 5, 0, 0.2); + System.out.println(perps[trial]); + } + Arrays.sort(perps); + System.out.println(Arrays.toString(perps)); + perplexities.add(perps[0]); + } + System.out.println(Joiner.on(",").join(perplexities)); + } + + @Test + public void testRandomStructuredModelViaMR() throws Exception { + int numGeneratingTopics = 3; + int numTerms = 9; + Matrix matrix = ClusteringTestUtils.randomStructuredModel(numGeneratingTopics, numTerms, new DoubleFunction() { + @Override + public double apply(double d) { + return 1.0 / Math.pow(d + 1.0, 3); + } + }); + + int numDocs = 500; + int numSamples = 10; + int numTopicsPerDoc = 1; + + Matrix sampledCorpus = ClusteringTestUtils.sampledCorpus(matrix, RandomUtils.getRandom(1234), + numDocs, numSamples, numTopicsPerDoc); + + Path sampleCorpusPath = getTestTempDirPath("corpus"); + Configuration configuration = getConfiguration(); + MatrixUtils.write(sampleCorpusPath, configuration, sampledCorpus); + int numIterations = 5; + List<Double> perplexities = Lists.newArrayList(); + int startTopic = numGeneratingTopics - 1; + int numTestTopics = startTopic; + while (numTestTopics < numGeneratingTopics + 2) { + Path topicModelStateTempPath = getTestTempDirPath("topicTemp" + numTestTopics); + Configuration conf = getConfiguration(); + CVB0Driver cvb0Driver = new CVB0Driver(); + cvb0Driver.run(conf, sampleCorpusPath, null, numTestTopics, numTerms, + ALPHA, ETA, numIterations, 1, 0, null, null, topicModelStateTempPath, 1234, 0.2f, 2, + 1, 3, 1, false); + perplexities.add(lowestPerplexity(conf, topicModelStateTempPath)); + numTestTopics++; + } + int bestTopic = -1; + double lowestPerplexity = Double.MAX_VALUE; + for (int t = 0; t < perplexities.size(); t++) { + if (perplexities.get(t) < lowestPerplexity) { + lowestPerplexity = perplexities.get(t); + bestTopic = t + startTopic; + } + } + assertEquals("The optimal number of topics is not that of the generating distribution", 4, bestTopic); + System.out.println("Perplexities: " + Joiner.on(", ").join(perplexities)); + } + + private static double lowestPerplexity(Configuration conf, Path topicModelTemp) + throws IOException { + double lowest = Double.MAX_VALUE; + double current; + int iteration = 2; + while (!Double.isNaN(current = CVB0Driver.readPerplexity(conf, topicModelTemp, iteration))) { + lowest = Math.min(current, lowest); + iteration++; + } + return lowest; + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestAffinityMatrixInputJob.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestAffinityMatrixInputJob.java b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestAffinityMatrixInputJob.java new file mode 100644 index 0000000..cd52bbd --- /dev/null +++ b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestAffinityMatrixInputJob.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.spectral; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.mahout.common.DummyRecordWriter; +import org.apache.mahout.common.MahoutTestCase; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; +import org.apache.mahout.math.hadoop.DistributedRowMatrix.MatrixEntryWritable; +import org.junit.Test; + +/** + * <p>Tests the affinity matrix input M/R task.</p> + * + * <p>The tricky item with this task is that the format of the input + * must be correct; it must take the form of a graph input, and for the + * current implementation, the input must be symmetric, e.g. the weight + * from node A to B = the weight from node B to A. This is not explicitly + * enforced within the task itself (since, as of the time these tests were + * written, we have not yet decided on a final rule regarding the + * symmetry/non-symmetry of the affinity matrix, so we are unofficially + * enforcing symmetry). Input looks something like this:</p> + * + * <pre>0, 0, 0 + * 0, 1, 10 + * 0, 2, 20 + * ... + * 1, 0, 10 + * 2, 0, 20 + * ...</pre> + * + * <p>The mapper's task is simply to convert each line of text into a + * DistributedRowMatrix entry, allowing the reducer to join each entry + * of the same row into a VectorWritable.</p> + * + * <p>Exceptions are thrown in cases of bad input format: if there are + * more or fewer than 3 numbers per line, or any of the numbers are missing. + */ +public class TestAffinityMatrixInputJob extends MahoutTestCase { + + private static final String [] RAW = {"0,0,0", "0,1,5", "0,2,10", "1,0,5", "1,1,0", + "1,2,20", "2,0,10", "2,1,20", "2,2,0"}; + private static final int RAW_DIMENSIONS = 3; + + @Test + public void testAffinityMatrixInputMapper() throws Exception { + AffinityMatrixInputMapper mapper = new AffinityMatrixInputMapper(); + Configuration conf = getConfiguration(); + conf.setInt(Keys.AFFINITY_DIMENSIONS, RAW_DIMENSIONS); + + // set up the dummy writer and the M/R context + DummyRecordWriter<IntWritable, MatrixEntryWritable> writer = + new DummyRecordWriter<>(); + Mapper<LongWritable, Text, IntWritable, MatrixEntryWritable>.Context + context = DummyRecordWriter.build(mapper, conf, writer); + + // loop through all the points and test each one is converted + // successfully to a DistributedRowMatrix.MatrixEntry + for (String s : RAW) { + mapper.map(new LongWritable(), new Text(s), context); + } + + // test the data was successfully constructed + assertEquals("Number of map results", RAW_DIMENSIONS, writer.getData().size()); + Set<IntWritable> keys = writer.getData().keySet(); + for (IntWritable i : keys) { + List<MatrixEntryWritable> row = writer.getData().get(i); + assertEquals("Number of items in row", RAW_DIMENSIONS, row.size()); + } + } + + @Test + public void testAffinitymatrixInputReducer() throws Exception { + AffinityMatrixInputMapper mapper = new AffinityMatrixInputMapper(); + Configuration conf = getConfiguration(); + conf.setInt(Keys.AFFINITY_DIMENSIONS, RAW_DIMENSIONS); + + // set up the dummy writer and the M/R context + DummyRecordWriter<IntWritable, MatrixEntryWritable> mapWriter = + new DummyRecordWriter<>(); + Mapper<LongWritable, Text, IntWritable, MatrixEntryWritable>.Context + mapContext = DummyRecordWriter.build(mapper, conf, mapWriter); + + // loop through all the points and test each one is converted + // successfully to a DistributedRowMatrix.MatrixEntry + for (String s : RAW) { + mapper.map(new LongWritable(), new Text(s), mapContext); + } + // store the data for checking later + Map<IntWritable, List<MatrixEntryWritable>> map = mapWriter.getData(); + + // now reduce the data + AffinityMatrixInputReducer reducer = new AffinityMatrixInputReducer(); + DummyRecordWriter<IntWritable, VectorWritable> redWriter = + new DummyRecordWriter<>(); + Reducer<IntWritable, MatrixEntryWritable, + IntWritable, VectorWritable>.Context redContext = DummyRecordWriter + .build(reducer, conf, redWriter, IntWritable.class, MatrixEntryWritable.class); + for (IntWritable key : mapWriter.getKeys()) { + reducer.reduce(key, mapWriter.getValue(key), redContext); + } + + // check that all the elements are correctly ordered + assertEquals("Number of reduce results", RAW_DIMENSIONS, redWriter.getData().size()); + for (IntWritable row : redWriter.getKeys()) { + List<VectorWritable> list = redWriter.getValue(row); + assertEquals("Should only be one vector", 1, list.size()); + // check that the elements in the array are correctly ordered + Vector v = list.get(0).get(); + for (Vector.Element e : v.all()) { + // find this value in the original map + MatrixEntryWritable toCompare = new MatrixEntryWritable(); + toCompare.setRow(-1); + toCompare.setCol(e.index()); + toCompare.setVal(e.get()); + assertTrue("This entry was correctly placed in its row", map.get(row).contains(toCompare)); + } + } + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestMatrixDiagonalizeJob.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestMatrixDiagonalizeJob.java b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestMatrixDiagonalizeJob.java new file mode 100644 index 0000000..c256890 --- /dev/null +++ b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestMatrixDiagonalizeJob.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.spectral; + +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.mahout.clustering.spectral.MatrixDiagonalizeJob.MatrixDiagonalizeMapper; +import org.apache.mahout.clustering.spectral.MatrixDiagonalizeJob.MatrixDiagonalizeReducer; +import org.apache.mahout.common.DummyRecordWriter; +import org.apache.mahout.common.MahoutTestCase; +import org.apache.mahout.math.RandomAccessSparseVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; +import org.junit.Test; + +/** + * <p>The MatrixDiagonalize task is pretty simple: given a matrix, + * it sums the elements of the row, and sticks the sum in position (i, i) + * of a new matrix of identical dimensions to the original.</p> + */ +public class TestMatrixDiagonalizeJob extends MahoutTestCase { + + private static final double[][] RAW = { {1, 2, 3}, {4, 5, 6}, {7, 8, 9} }; + private static final int RAW_DIMENSIONS = 3; + + private static double rowSum(double [] row) { + double sum = 0; + for (double r : row) { + sum += r; + } + return sum; + } + + @Test + public void testMatrixDiagonalizeMapper() throws Exception { + MatrixDiagonalizeMapper mapper = new MatrixDiagonalizeMapper(); + Configuration conf = getConfiguration(); + conf.setInt(Keys.AFFINITY_DIMENSIONS, RAW_DIMENSIONS); + + // set up the dummy writers + DummyRecordWriter<NullWritable, IntDoublePairWritable> writer = + new DummyRecordWriter<>(); + Mapper<IntWritable, VectorWritable, NullWritable, IntDoublePairWritable>.Context + context = DummyRecordWriter.build(mapper, conf, writer); + + // perform the mapping + for (int i = 0; i < RAW_DIMENSIONS; i++) { + RandomAccessSparseVector toAdd = new RandomAccessSparseVector(RAW_DIMENSIONS); + toAdd.assign(RAW[i]); + mapper.map(new IntWritable(i), new VectorWritable(toAdd), context); + } + + // check the number of the results + assertEquals("Number of map results", RAW_DIMENSIONS, + writer.getValue(NullWritable.get()).size()); + } + + @Test + public void testMatrixDiagonalizeReducer() throws Exception { + MatrixDiagonalizeMapper mapper = new MatrixDiagonalizeMapper(); + Configuration conf = getConfiguration(); + conf.setInt(Keys.AFFINITY_DIMENSIONS, RAW_DIMENSIONS); + + // set up the dummy writers + DummyRecordWriter<NullWritable, IntDoublePairWritable> mapWriter = + new DummyRecordWriter<>(); + Mapper<IntWritable, VectorWritable, NullWritable, IntDoublePairWritable>.Context + mapContext = DummyRecordWriter.build(mapper, conf, mapWriter); + + // perform the mapping + for (int i = 0; i < RAW_DIMENSIONS; i++) { + RandomAccessSparseVector toAdd = new RandomAccessSparseVector(RAW_DIMENSIONS); + toAdd.assign(RAW[i]); + mapper.map(new IntWritable(i), new VectorWritable(toAdd), mapContext); + } + + // now perform the reduction + MatrixDiagonalizeReducer reducer = new MatrixDiagonalizeReducer(); + DummyRecordWriter<NullWritable, VectorWritable> redWriter = new + DummyRecordWriter<>(); + Reducer<NullWritable, IntDoublePairWritable, NullWritable, VectorWritable>.Context + redContext = DummyRecordWriter.build(reducer, conf, redWriter, + NullWritable.class, IntDoublePairWritable.class); + + // only need one reduction + reducer.reduce(NullWritable.get(), mapWriter.getValue(NullWritable.get()), redContext); + + // first, make sure there's only one result + List<VectorWritable> list = redWriter.getValue(NullWritable.get()); + assertEquals("Only a single resulting vector", 1, list.size()); + Vector v = list.get(0).get(); + for (int i = 0; i < v.size(); i++) { + assertEquals("Element sum is correct", rowSum(RAW[i]), v.get(i),0.01); + } + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestUnitVectorizerJob.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestUnitVectorizerJob.java b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestUnitVectorizerJob.java new file mode 100644 index 0000000..c971572 --- /dev/null +++ b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestUnitVectorizerJob.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.spectral; + +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.mahout.clustering.spectral.UnitVectorizerJob.UnitVectorizerMapper; +import org.apache.mahout.common.DummyRecordWriter; +import org.apache.mahout.common.MahoutTestCase; +import org.apache.mahout.math.RandomAccessSparseVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; +import org.junit.Test; + +public class TestUnitVectorizerJob extends MahoutTestCase { + + private static final double [][] RAW = { {1, 2, 3}, {4, 5, 6}, {7, 8, 9} }; + + @Test + public void testUnitVectorizerMapper() throws Exception { + UnitVectorizerMapper mapper = new UnitVectorizerMapper(); + Configuration conf = getConfiguration(); + + // set up the dummy writers + DummyRecordWriter<IntWritable, VectorWritable> writer = new + DummyRecordWriter<>(); + Mapper<IntWritable, VectorWritable, IntWritable, VectorWritable>.Context + context = DummyRecordWriter.build(mapper, conf, writer); + + // perform the mapping + for (int i = 0; i < RAW.length; i++) { + Vector vector = new RandomAccessSparseVector(RAW[i].length); + vector.assign(RAW[i]); + mapper.map(new IntWritable(i), new VectorWritable(vector), context); + } + + // check the results + assertEquals("Number of map results", RAW.length, writer.getData().size()); + for (int i = 0; i < RAW.length; i++) { + IntWritable key = new IntWritable(i); + List<VectorWritable> list = writer.getValue(key); + assertEquals("Only one element per row", 1, list.size()); + Vector v = list.get(0).get(); + assertTrue("Unit vector sum is 1 or differs by 0.0001", Math.abs(v.norm(2) - 1) < 0.000001); + } + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestVectorCache.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestVectorCache.java b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestVectorCache.java new file mode 100644 index 0000000..e09bbe4 --- /dev/null +++ b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestVectorCache.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.spectral; + +import java.net.URI; + +import com.google.common.io.Closeables; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.filecache.DistributedCache; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; +import org.apache.mahout.common.HadoopUtil; +import org.apache.mahout.common.MahoutTestCase; +import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterator; +import org.apache.mahout.math.DenseVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; +import org.junit.Test; + +public class TestVectorCache extends MahoutTestCase { + + private static final double [] VECTOR = { 1, 2, 3, 4 }; + + @Test + public void testSave() throws Exception { + Configuration conf = getConfiguration(); + Writable key = new IntWritable(0); + Vector value = new DenseVector(VECTOR); + Path path = getTestTempDirPath("output"); + + // write the vector out + VectorCache.save(key, value, path, conf, true, true); + + // can we read it from here? + SequenceFileValueIterator<VectorWritable> iterator = + new SequenceFileValueIterator<>(path, true, conf); + try { + VectorWritable old = iterator.next(); + // test if the values are identical + assertEquals("Saved vector is identical to original", old.get(), value); + } finally { + Closeables.close(iterator, true); + } + } + + @Test + public void testLoad() throws Exception { + // save a vector manually + Configuration conf = getConfiguration(); + Writable key = new IntWritable(0); + Vector value = new DenseVector(VECTOR); + Path path = getTestTempDirPath("output"); + + FileSystem fs = FileSystem.get(path.toUri(), conf); + // write the vector + path = fs.makeQualified(path); + fs.deleteOnExit(path); + HadoopUtil.delete(conf, path); + SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path, IntWritable.class, VectorWritable.class); + try { + writer.append(key, new VectorWritable(value)); + } finally { + Closeables.close(writer, false); + } + DistributedCache.setCacheFiles(new URI[] {path.toUri()}, conf); + + // load it + Vector result = VectorCache.load(conf); + + // are they the same? + assertNotNull("Vector is null", result); + assertEquals("Loaded vector is not identical to original", result, value); + } + + @Test + public void testAll() throws Exception { + Configuration conf = getConfiguration(); + Vector v = new DenseVector(VECTOR); + Path toSave = getTestTempDirPath("output"); + Writable key = new IntWritable(0); + + // save it + VectorCache.save(key, v, toSave, conf); + + // now, load it back + Vector v2 = VectorCache.load(conf); + + // are they the same? + assertNotNull("Vector is null", v2); + assertEquals("Vectors are not identical", v2, v); + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestVectorMatrixMultiplicationJob.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestVectorMatrixMultiplicationJob.java b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestVectorMatrixMultiplicationJob.java new file mode 100644 index 0000000..8cd52f4 --- /dev/null +++ b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/TestVectorMatrixMultiplicationJob.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.spectral; + +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.mahout.clustering.spectral.VectorMatrixMultiplicationJob.VectorMatrixMultiplicationMapper; +import org.apache.mahout.common.DummyRecordWriter; +import org.apache.mahout.common.MahoutTestCase; +import org.apache.mahout.math.DenseVector; +import org.apache.mahout.math.RandomAccessSparseVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; +import org.junit.Test; + +/** + * <p>This test ensures that a Vector can be successfully multiplied + * with a matrix.</p> + */ +public class TestVectorMatrixMultiplicationJob extends MahoutTestCase { + + private static final double [][] MATRIX = { {1, 1}, {2, 3} }; + private static final double [] VECTOR = {9, 16}; + + @Test + public void testVectorMatrixMultiplicationMapper() throws Exception { + VectorMatrixMultiplicationMapper mapper = new VectorMatrixMultiplicationMapper(); + Configuration conf = getConfiguration(); + + // set up all the parameters for the job + Vector toSave = new DenseVector(VECTOR); + DummyRecordWriter<IntWritable, VectorWritable> writer = new + DummyRecordWriter<>(); + Mapper<IntWritable, VectorWritable, IntWritable, VectorWritable>.Context + context = DummyRecordWriter.build(mapper, conf, writer); + mapper.setup(toSave); + + // run the job + for (int i = 0; i < MATRIX.length; i++) { + Vector v = new RandomAccessSparseVector(MATRIX[i].length); + v.assign(MATRIX[i]); + mapper.map(new IntWritable(i), new VectorWritable(v), context); + } + + // check the results + assertEquals("Number of map results", MATRIX.length, writer.getData().size()); + for (int i = 0; i < MATRIX.length; i++) { + List<VectorWritable> list = writer.getValue(new IntWritable(i)); + assertEquals("Only one vector per key", 1, list.size()); + Vector v = list.get(0).get(); + for (int j = 0; j < MATRIX[i].length; j++) { + double total = Math.sqrt(VECTOR[i]) * Math.sqrt(VECTOR[j]) * MATRIX[i][j]; + assertEquals("Product matrix elements", total, v.get(j),EPSILON); + } + } + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/kmeans/TestEigenSeedGenerator.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/kmeans/TestEigenSeedGenerator.java b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/kmeans/TestEigenSeedGenerator.java new file mode 100644 index 0000000..5c73bbc --- /dev/null +++ b/community/mahout-mr/src/test/java/org/apache/mahout/clustering/spectral/kmeans/TestEigenSeedGenerator.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.spectral.kmeans; + +import java.util.Collection; +import java.util.HashSet; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; +import org.apache.mahout.clustering.Cluster; +import org.apache.mahout.clustering.ClusteringTestUtils; +import org.apache.mahout.clustering.iterator.ClusterWritable; +import org.apache.mahout.clustering.spectral.kmeans.EigenSeedGenerator; +import org.apache.mahout.common.MahoutTestCase; +import org.apache.mahout.common.distance.ManhattanDistanceMeasure; +import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterable; +import org.apache.mahout.math.RandomAccessSparseVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.Lists; + +public final class TestEigenSeedGenerator extends MahoutTestCase { + + private + static final double[][] RAW = {{1, 0, 0}, {1, 0, 0}, {0, 1, 0}, {0, 1, 0}, + {0, 1, 0}, {0, 0, 1}, {0, 0, 1}}; + + private FileSystem fs; + + private static List<VectorWritable> getPoints() { + List<VectorWritable> points = Lists.newArrayList(); + for (double[] fr : RAW) { + Vector vec = new RandomAccessSparseVector(fr.length); + vec.assign(fr); + points.add(new VectorWritable(vec)); + } + return points; + } + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + Configuration conf = getConfiguration(); + fs = FileSystem.get(conf); + } + + @Test + public void testEigenSeedGenerator() throws Exception { + List<VectorWritable> points = getPoints(); + Job job = new Job(); + Configuration conf = job.getConfiguration(); + job.setMapOutputValueClass(VectorWritable.class); + Path input = getTestTempFilePath("eigen-input"); + Path output = getTestTempDirPath("eigen-output"); + ClusteringTestUtils.writePointsToFile(points, input, fs, conf); + + EigenSeedGenerator.buildFromEigens(conf, input, output, 3, new ManhattanDistanceMeasure()); + + int clusterCount = 0; + Collection<Integer> set = new HashSet<>(); + Vector v[] = new Vector[3]; + for (ClusterWritable clusterWritable : + new SequenceFileValueIterable<ClusterWritable>( + new Path(output, "part-eigenSeed"), true, conf)) { + Cluster cluster = clusterWritable.getValue(); + int id = cluster.getId(); + assertTrue(set.add(id)); // validate unique id's + v[id] = cluster.getCenter(); + clusterCount++; + } + assertEquals(3, clusterCount); // validate sample count + // validate pair-wise orthogonality + assertEquals(0, v[0].dot(v[1]), 1E-10); + assertEquals(0, v[1].dot(v[2]), 1E-10); + assertEquals(0, v[0].dot(v[2]), 1E-10); + } + +}
