http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/test/java/org/apache/mahout/clustering/lda/cvb/TestCVBModelTrainer.java ---------------------------------------------------------------------- diff --git a/mr/src/test/java/org/apache/mahout/clustering/lda/cvb/TestCVBModelTrainer.java b/mr/src/test/java/org/apache/mahout/clustering/lda/cvb/TestCVBModelTrainer.java new file mode 100644 index 0000000..dd4360a --- /dev/null +++ b/mr/src/test/java/org/apache/mahout/clustering/lda/cvb/TestCVBModelTrainer.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.mahout.clustering.lda.cvb; + +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.mahout.clustering.ClusteringTestUtils; +import org.apache.mahout.common.MahoutTestCase; +import org.apache.mahout.common.RandomUtils; +import org.apache.mahout.math.Matrix; +import org.apache.mahout.math.MatrixUtils; +import org.apache.mahout.math.function.DoubleFunction; +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +public final class TestCVBModelTrainer extends MahoutTestCase { + + private static final double ETA = 0.1; + private static final double ALPHA = 0.1; + + @Test + public void testInMemoryCVB0() throws Exception { + String[] terms = new String[26]; + for (int i=0; i<terms.length; i++) { + terms[i] = String.valueOf((char) (i + 'a')); + } + int numGeneratingTopics = 3; + int numTerms = 26; + Matrix matrix = ClusteringTestUtils.randomStructuredModel(numGeneratingTopics, numTerms, new DoubleFunction() { + @Override public double apply(double d) { + return 1.0 / Math.pow(d + 1.0, 2); + } + }); + + int numDocs = 100; + int numSamples = 20; + int numTopicsPerDoc = 1; + + Matrix sampledCorpus = ClusteringTestUtils.sampledCorpus(matrix, RandomUtils.getRandom(), + numDocs, numSamples, numTopicsPerDoc); + + List<Double> perplexities = Lists.newArrayList(); + int numTrials = 1; + for (int numTestTopics = 1; numTestTopics < 2 * numGeneratingTopics; numTestTopics++) { + double[] perps = new double[numTrials]; + for (int trial = 0; trial < numTrials; trial++) { + InMemoryCollapsedVariationalBayes0 cvb = + new InMemoryCollapsedVariationalBayes0(sampledCorpus, terms, numTestTopics, ALPHA, ETA, 2, 1, 0); + cvb.setVerbose(true); + perps[trial] = cvb.iterateUntilConvergence(0, 5, 0, 0.2); + System.out.println(perps[trial]); + } + Arrays.sort(perps); + System.out.println(Arrays.toString(perps)); + perplexities.add(perps[0]); + } + System.out.println(Joiner.on(",").join(perplexities)); + } + + @Test + public void testRandomStructuredModelViaMR() throws Exception { + int numGeneratingTopics = 3; + int numTerms = 9; + Matrix matrix = ClusteringTestUtils.randomStructuredModel(numGeneratingTopics, numTerms, new DoubleFunction() { + @Override + public double apply(double d) { + return 1.0 / Math.pow(d + 1.0, 3); + } + }); + + int numDocs = 500; + int numSamples = 10; + int numTopicsPerDoc = 1; + + Matrix sampledCorpus = ClusteringTestUtils.sampledCorpus(matrix, RandomUtils.getRandom(1234), + numDocs, numSamples, numTopicsPerDoc); + + Path sampleCorpusPath = getTestTempDirPath("corpus"); + Configuration configuration = getConfiguration(); + MatrixUtils.write(sampleCorpusPath, configuration, sampledCorpus); + int numIterations = 5; + List<Double> perplexities = Lists.newArrayList(); + int startTopic = numGeneratingTopics - 1; + int numTestTopics = startTopic; + while (numTestTopics < numGeneratingTopics + 2) { + Path topicModelStateTempPath = getTestTempDirPath("topicTemp" + numTestTopics); + Configuration conf = getConfiguration(); + CVB0Driver cvb0Driver = new CVB0Driver(); + cvb0Driver.run(conf, sampleCorpusPath, null, numTestTopics, numTerms, + ALPHA, ETA, numIterations, 1, 0, null, null, topicModelStateTempPath, 1234, 0.2f, 2, + 1, 3, 1, false); + perplexities.add(lowestPerplexity(conf, topicModelStateTempPath)); + numTestTopics++; + } + int bestTopic = -1; + double lowestPerplexity = Double.MAX_VALUE; + for (int t = 0; t < perplexities.size(); t++) { + if (perplexities.get(t) < lowestPerplexity) { + lowestPerplexity = perplexities.get(t); + bestTopic = t + startTopic; + } + } + assertEquals("The optimal number of topics is not that of the generating distribution", 4, bestTopic); + System.out.println("Perplexities: " + Joiner.on(", ").join(perplexities)); + } + + private static double lowestPerplexity(Configuration conf, Path topicModelTemp) + throws IOException { + double lowest = Double.MAX_VALUE; + double current; + int iteration = 2; + while (!Double.isNaN(current = CVB0Driver.readPerplexity(conf, topicModelTemp, iteration))) { + lowest = Math.min(current, lowest); + iteration++; + } + return lowest; + } + +}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/test/java/org/apache/mahout/clustering/spectral/TestAffinityMatrixInputJob.java ---------------------------------------------------------------------- diff --git a/mr/src/test/java/org/apache/mahout/clustering/spectral/TestAffinityMatrixInputJob.java b/mr/src/test/java/org/apache/mahout/clustering/spectral/TestAffinityMatrixInputJob.java new file mode 100644 index 0000000..6e0cd18 --- /dev/null +++ b/mr/src/test/java/org/apache/mahout/clustering/spectral/TestAffinityMatrixInputJob.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.spectral; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.mahout.common.DummyRecordWriter; +import org.apache.mahout.common.MahoutTestCase; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; +import org.apache.mahout.math.hadoop.DistributedRowMatrix.MatrixEntryWritable; +import org.junit.Test; + +/** + * <p>Tests the affinity matrix input M/R task.</p> + * + * <p>The tricky item with this task is that the format of the input + * must be correct; it must take the form of a graph input, and for the + * current implementation, the input must be symmetric, e.g. the weight + * from node A to B = the weight from node B to A. This is not explicitly + * enforced within the task itself (since, as of the time these tests were + * written, we have not yet decided on a final rule regarding the + * symmetry/non-symmetry of the affinity matrix, so we are unofficially + * enforcing symmetry). Input looks something like this:</p> + * + * <pre>0, 0, 0 + * 0, 1, 10 + * 0, 2, 20 + * ... + * 1, 0, 10 + * 2, 0, 20 + * ...</pre> + * + * <p>The mapper's task is simply to convert each line of text into a + * DistributedRowMatrix entry, allowing the reducer to join each entry + * of the same row into a VectorWritable.</p> + * + * <p>Exceptions are thrown in cases of bad input format: if there are + * more or fewer than 3 numbers per line, or any of the numbers are missing. + */ +public class TestAffinityMatrixInputJob extends MahoutTestCase { + + private static final String [] RAW = {"0,0,0", "0,1,5", "0,2,10", "1,0,5", "1,1,0", + "1,2,20", "2,0,10", "2,1,20", "2,2,0"}; + private static final int RAW_DIMENSIONS = 3; + + @Test + public void testAffinityMatrixInputMapper() throws Exception { + AffinityMatrixInputMapper mapper = new AffinityMatrixInputMapper(); + Configuration conf = getConfiguration(); + conf.setInt(Keys.AFFINITY_DIMENSIONS, RAW_DIMENSIONS); + + // set up the dummy writer and the M/R context + DummyRecordWriter<IntWritable, MatrixEntryWritable> writer = + new DummyRecordWriter<IntWritable, MatrixEntryWritable>(); + Mapper<LongWritable, Text, IntWritable, MatrixEntryWritable>.Context + context = DummyRecordWriter.build(mapper, conf, writer); + + // loop through all the points and test each one is converted + // successfully to a DistributedRowMatrix.MatrixEntry + for (String s : RAW) { + mapper.map(new LongWritable(), new Text(s), context); + } + + // test the data was successfully constructed + assertEquals("Number of map results", RAW_DIMENSIONS, writer.getData().size()); + Set<IntWritable> keys = writer.getData().keySet(); + for (IntWritable i : keys) { + List<MatrixEntryWritable> row = writer.getData().get(i); + assertEquals("Number of items in row", RAW_DIMENSIONS, row.size()); + } + } + + @Test + public void testAffinitymatrixInputReducer() throws Exception { + AffinityMatrixInputMapper mapper = new AffinityMatrixInputMapper(); + Configuration conf = getConfiguration(); + conf.setInt(Keys.AFFINITY_DIMENSIONS, RAW_DIMENSIONS); + + // set up the dummy writer and the M/R context + DummyRecordWriter<IntWritable, MatrixEntryWritable> mapWriter = + new DummyRecordWriter<IntWritable, MatrixEntryWritable>(); + Mapper<LongWritable, Text, IntWritable, MatrixEntryWritable>.Context + mapContext = DummyRecordWriter.build(mapper, conf, mapWriter); + + // loop through all the points and test each one is converted + // successfully to a DistributedRowMatrix.MatrixEntry + for (String s : RAW) { + mapper.map(new LongWritable(), new Text(s), mapContext); + } + // store the data for checking later + Map<IntWritable, List<MatrixEntryWritable>> map = mapWriter.getData(); + + // now reduce the data + AffinityMatrixInputReducer reducer = new AffinityMatrixInputReducer(); + DummyRecordWriter<IntWritable, VectorWritable> redWriter = + new DummyRecordWriter<IntWritable, VectorWritable>(); + Reducer<IntWritable, MatrixEntryWritable, + IntWritable, VectorWritable>.Context redContext = DummyRecordWriter + .build(reducer, conf, redWriter, IntWritable.class, MatrixEntryWritable.class); + for (IntWritable key : mapWriter.getKeys()) { + reducer.reduce(key, mapWriter.getValue(key), redContext); + } + + // check that all the elements are correctly ordered + assertEquals("Number of reduce results", RAW_DIMENSIONS, redWriter.getData().size()); + for (IntWritable row : redWriter.getKeys()) { + List<VectorWritable> list = redWriter.getValue(row); + assertEquals("Should only be one vector", 1, list.size()); + // check that the elements in the array are correctly ordered + Vector v = list.get(0).get(); + for (Vector.Element e : v.all()) { + // find this value in the original map + MatrixEntryWritable toCompare = new MatrixEntryWritable(); + toCompare.setRow(-1); + toCompare.setCol(e.index()); + toCompare.setVal(e.get()); + assertTrue("This entry was correctly placed in its row", map.get(row).contains(toCompare)); + } + } + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/test/java/org/apache/mahout/clustering/spectral/TestMatrixDiagonalizeJob.java ---------------------------------------------------------------------- diff --git a/mr/src/test/java/org/apache/mahout/clustering/spectral/TestMatrixDiagonalizeJob.java b/mr/src/test/java/org/apache/mahout/clustering/spectral/TestMatrixDiagonalizeJob.java new file mode 100644 index 0000000..7d4ec1f --- /dev/null +++ b/mr/src/test/java/org/apache/mahout/clustering/spectral/TestMatrixDiagonalizeJob.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.spectral; + +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.mahout.clustering.spectral.MatrixDiagonalizeJob.MatrixDiagonalizeMapper; +import org.apache.mahout.clustering.spectral.MatrixDiagonalizeJob.MatrixDiagonalizeReducer; +import org.apache.mahout.common.DummyRecordWriter; +import org.apache.mahout.common.MahoutTestCase; +import org.apache.mahout.math.RandomAccessSparseVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; +import org.junit.Test; + +/** + * <p>The MatrixDiagonalize task is pretty simple: given a matrix, + * it sums the elements of the row, and sticks the sum in position (i, i) + * of a new matrix of identical dimensions to the original.</p> + */ +public class TestMatrixDiagonalizeJob extends MahoutTestCase { + + private static final double[][] RAW = { {1, 2, 3}, {4, 5, 6}, {7, 8, 9} }; + private static final int RAW_DIMENSIONS = 3; + + private static double rowSum(double [] row) { + double sum = 0; + for (double r : row) { + sum += r; + } + return sum; + } + + @Test + public void testMatrixDiagonalizeMapper() throws Exception { + MatrixDiagonalizeMapper mapper = new MatrixDiagonalizeMapper(); + Configuration conf = getConfiguration(); + conf.setInt(Keys.AFFINITY_DIMENSIONS, RAW_DIMENSIONS); + + // set up the dummy writers + DummyRecordWriter<NullWritable, IntDoublePairWritable> writer = + new DummyRecordWriter<NullWritable, IntDoublePairWritable>(); + Mapper<IntWritable, VectorWritable, NullWritable, IntDoublePairWritable>.Context + context = DummyRecordWriter.build(mapper, conf, writer); + + // perform the mapping + for (int i = 0; i < RAW_DIMENSIONS; i++) { + RandomAccessSparseVector toAdd = new RandomAccessSparseVector(RAW_DIMENSIONS); + toAdd.assign(RAW[i]); + mapper.map(new IntWritable(i), new VectorWritable(toAdd), context); + } + + // check the number of the results + assertEquals("Number of map results", RAW_DIMENSIONS, + writer.getValue(NullWritable.get()).size()); + } + + @Test + public void testMatrixDiagonalizeReducer() throws Exception { + MatrixDiagonalizeMapper mapper = new MatrixDiagonalizeMapper(); + Configuration conf = getConfiguration(); + conf.setInt(Keys.AFFINITY_DIMENSIONS, RAW_DIMENSIONS); + + // set up the dummy writers + DummyRecordWriter<NullWritable, IntDoublePairWritable> mapWriter = + new DummyRecordWriter<NullWritable, IntDoublePairWritable>(); + Mapper<IntWritable, VectorWritable, NullWritable, IntDoublePairWritable>.Context + mapContext = DummyRecordWriter.build(mapper, conf, mapWriter); + + // perform the mapping + for (int i = 0; i < RAW_DIMENSIONS; i++) { + RandomAccessSparseVector toAdd = new RandomAccessSparseVector(RAW_DIMENSIONS); + toAdd.assign(RAW[i]); + mapper.map(new IntWritable(i), new VectorWritable(toAdd), mapContext); + } + + // now perform the reduction + MatrixDiagonalizeReducer reducer = new MatrixDiagonalizeReducer(); + DummyRecordWriter<NullWritable, VectorWritable> redWriter = new + DummyRecordWriter<NullWritable, VectorWritable>(); + Reducer<NullWritable, IntDoublePairWritable, NullWritable, VectorWritable>.Context + redContext = DummyRecordWriter.build(reducer, conf, redWriter, + NullWritable.class, IntDoublePairWritable.class); + + // only need one reduction + reducer.reduce(NullWritable.get(), mapWriter.getValue(NullWritable.get()), redContext); + + // first, make sure there's only one result + List<VectorWritable> list = redWriter.getValue(NullWritable.get()); + assertEquals("Only a single resulting vector", 1, list.size()); + Vector v = list.get(0).get(); + for (int i = 0; i < v.size(); i++) { + assertEquals("Element sum is correct", rowSum(RAW[i]), v.get(i),0.01); + } + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/test/java/org/apache/mahout/clustering/spectral/TestUnitVectorizerJob.java ---------------------------------------------------------------------- diff --git a/mr/src/test/java/org/apache/mahout/clustering/spectral/TestUnitVectorizerJob.java b/mr/src/test/java/org/apache/mahout/clustering/spectral/TestUnitVectorizerJob.java new file mode 100644 index 0000000..f317f6e --- /dev/null +++ b/mr/src/test/java/org/apache/mahout/clustering/spectral/TestUnitVectorizerJob.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.spectral; + +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.mahout.clustering.spectral.UnitVectorizerJob.UnitVectorizerMapper; +import org.apache.mahout.common.DummyRecordWriter; +import org.apache.mahout.common.MahoutTestCase; +import org.apache.mahout.math.RandomAccessSparseVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; +import org.junit.Test; + +public class TestUnitVectorizerJob extends MahoutTestCase { + + private static final double [][] RAW = { {1, 2, 3}, {4, 5, 6}, {7, 8, 9} }; + + @Test + public void testUnitVectorizerMapper() throws Exception { + UnitVectorizerMapper mapper = new UnitVectorizerMapper(); + Configuration conf = getConfiguration(); + + // set up the dummy writers + DummyRecordWriter<IntWritable, VectorWritable> writer = new + DummyRecordWriter<IntWritable, VectorWritable>(); + Mapper<IntWritable, VectorWritable, IntWritable, VectorWritable>.Context + context = DummyRecordWriter.build(mapper, conf, writer); + + // perform the mapping + for (int i = 0; i < RAW.length; i++) { + Vector vector = new RandomAccessSparseVector(RAW[i].length); + vector.assign(RAW[i]); + mapper.map(new IntWritable(i), new VectorWritable(vector), context); + } + + // check the results + assertEquals("Number of map results", RAW.length, writer.getData().size()); + for (int i = 0; i < RAW.length; i++) { + IntWritable key = new IntWritable(i); + List<VectorWritable> list = writer.getValue(key); + assertEquals("Only one element per row", 1, list.size()); + Vector v = list.get(0).get(); + assertTrue("Unit vector sum is 1 or differs by 0.0001", Math.abs(v.norm(2) - 1) < 0.000001); + } + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/test/java/org/apache/mahout/clustering/spectral/TestVectorCache.java ---------------------------------------------------------------------- diff --git a/mr/src/test/java/org/apache/mahout/clustering/spectral/TestVectorCache.java b/mr/src/test/java/org/apache/mahout/clustering/spectral/TestVectorCache.java new file mode 100644 index 0000000..9091efe --- /dev/null +++ b/mr/src/test/java/org/apache/mahout/clustering/spectral/TestVectorCache.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.spectral; + +import java.net.URI; + +import com.google.common.io.Closeables; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.filecache.DistributedCache; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; +import org.apache.mahout.common.HadoopUtil; +import org.apache.mahout.common.MahoutTestCase; +import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterator; +import org.apache.mahout.math.DenseVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; +import org.junit.Test; + +public class TestVectorCache extends MahoutTestCase { + + private static final double [] VECTOR = { 1, 2, 3, 4 }; + + @Test + public void testSave() throws Exception { + Configuration conf = getConfiguration(); + Writable key = new IntWritable(0); + Vector value = new DenseVector(VECTOR); + Path path = getTestTempDirPath("output"); + + // write the vector out + VectorCache.save(key, value, path, conf, true, true); + + // can we read it from here? + SequenceFileValueIterator<VectorWritable> iterator = + new SequenceFileValueIterator<VectorWritable>(path, true, conf); + try { + VectorWritable old = iterator.next(); + // test if the values are identical + assertEquals("Saved vector is identical to original", old.get(), value); + } finally { + Closeables.close(iterator, true); + } + } + + @Test + public void testLoad() throws Exception { + // save a vector manually + Configuration conf = getConfiguration(); + Writable key = new IntWritable(0); + Vector value = new DenseVector(VECTOR); + Path path = getTestTempDirPath("output"); + + FileSystem fs = FileSystem.get(path.toUri(), conf); + // write the vector + path = fs.makeQualified(path); + fs.deleteOnExit(path); + HadoopUtil.delete(conf, path); + SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path, IntWritable.class, VectorWritable.class); + try { + writer.append(key, new VectorWritable(value)); + } finally { + Closeables.close(writer, false); + } + DistributedCache.setCacheFiles(new URI[] {path.toUri()}, conf); + + // load it + Vector result = VectorCache.load(conf); + + // are they the same? + assertNotNull("Vector is null", result); + assertEquals("Loaded vector is not identical to original", result, value); + } + + @Test + public void testAll() throws Exception { + Configuration conf = getConfiguration(); + Vector v = new DenseVector(VECTOR); + Path toSave = getTestTempDirPath("output"); + Writable key = new IntWritable(0); + + // save it + VectorCache.save(key, v, toSave, conf); + + // now, load it back + Vector v2 = VectorCache.load(conf); + + // are they the same? + assertNotNull("Vector is null", v2); + assertEquals("Vectors are not identical", v2, v); + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/test/java/org/apache/mahout/clustering/spectral/TestVectorMatrixMultiplicationJob.java ---------------------------------------------------------------------- diff --git a/mr/src/test/java/org/apache/mahout/clustering/spectral/TestVectorMatrixMultiplicationJob.java b/mr/src/test/java/org/apache/mahout/clustering/spectral/TestVectorMatrixMultiplicationJob.java new file mode 100644 index 0000000..2fd83e2 --- /dev/null +++ b/mr/src/test/java/org/apache/mahout/clustering/spectral/TestVectorMatrixMultiplicationJob.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.spectral; + +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.mahout.clustering.spectral.VectorMatrixMultiplicationJob.VectorMatrixMultiplicationMapper; +import org.apache.mahout.common.DummyRecordWriter; +import org.apache.mahout.common.MahoutTestCase; +import org.apache.mahout.math.DenseVector; +import org.apache.mahout.math.RandomAccessSparseVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; +import org.junit.Test; + +/** + * <p>This test ensures that a Vector can be successfully multiplied + * with a matrix.</p> + */ +public class TestVectorMatrixMultiplicationJob extends MahoutTestCase { + + private static final double [][] MATRIX = { {1, 1}, {2, 3} }; + private static final double [] VECTOR = {9, 16}; + + @Test + public void testVectorMatrixMultiplicationMapper() throws Exception { + VectorMatrixMultiplicationMapper mapper = new VectorMatrixMultiplicationMapper(); + Configuration conf = getConfiguration(); + + // set up all the parameters for the job + Vector toSave = new DenseVector(VECTOR); + DummyRecordWriter<IntWritable, VectorWritable> writer = new + DummyRecordWriter<IntWritable, VectorWritable>(); + Mapper<IntWritable, VectorWritable, IntWritable, VectorWritable>.Context + context = DummyRecordWriter.build(mapper, conf, writer); + mapper.setup(toSave); + + // run the job + for (int i = 0; i < MATRIX.length; i++) { + Vector v = new RandomAccessSparseVector(MATRIX[i].length); + v.assign(MATRIX[i]); + mapper.map(new IntWritable(i), new VectorWritable(v), context); + } + + // check the results + assertEquals("Number of map results", MATRIX.length, writer.getData().size()); + for (int i = 0; i < MATRIX.length; i++) { + List<VectorWritable> list = writer.getValue(new IntWritable(i)); + assertEquals("Only one vector per key", 1, list.size()); + Vector v = list.get(0).get(); + for (int j = 0; j < MATRIX[i].length; j++) { + double total = Math.sqrt(VECTOR[i]) * Math.sqrt(VECTOR[j]) * MATRIX[i][j]; + assertEquals("Product matrix elements", total, v.get(j),EPSILON); + } + } + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/test/java/org/apache/mahout/clustering/spectral/kmeans/TestEigenSeedGenerator.java ---------------------------------------------------------------------- diff --git a/mr/src/test/java/org/apache/mahout/clustering/spectral/kmeans/TestEigenSeedGenerator.java b/mr/src/test/java/org/apache/mahout/clustering/spectral/kmeans/TestEigenSeedGenerator.java new file mode 100644 index 0000000..4075fe4 --- /dev/null +++ b/mr/src/test/java/org/apache/mahout/clustering/spectral/kmeans/TestEigenSeedGenerator.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.spectral.kmeans; + +import java.util.Collection; +import java.util.HashSet; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; +import org.apache.mahout.clustering.Cluster; +import org.apache.mahout.clustering.ClusteringTestUtils; +import org.apache.mahout.clustering.iterator.ClusterWritable; +import org.apache.mahout.clustering.spectral.kmeans.EigenSeedGenerator; +import org.apache.mahout.common.MahoutTestCase; +import org.apache.mahout.common.distance.ManhattanDistanceMeasure; +import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterable; +import org.apache.mahout.math.RandomAccessSparseVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.Lists; + +public final class TestEigenSeedGenerator extends MahoutTestCase { + + private + static final double[][] RAW = {{1, 0, 0}, {1, 0, 0}, {0, 1, 0}, {0, 1, 0}, + {0, 1, 0}, {0, 0, 1}, {0, 0, 1}}; + + private FileSystem fs; + + private static List<VectorWritable> getPoints() { + List<VectorWritable> points = Lists.newArrayList(); + for (double[] fr : RAW) { + Vector vec = new RandomAccessSparseVector(fr.length); + vec.assign(fr); + points.add(new VectorWritable(vec)); + } + return points; + } + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + Configuration conf = getConfiguration(); + fs = FileSystem.get(conf); + } + + @Test + public void testEigenSeedGenerator() throws Exception { + List<VectorWritable> points = getPoints(); + Job job = new Job(); + Configuration conf = job.getConfiguration(); + job.setMapOutputValueClass(VectorWritable.class); + Path input = getTestTempFilePath("eigen-input"); + Path output = getTestTempDirPath("eigen-output"); + ClusteringTestUtils.writePointsToFile(points, input, fs, conf); + + EigenSeedGenerator.buildFromEigens(conf, input, output, 3, new ManhattanDistanceMeasure()); + + int clusterCount = 0; + Collection<Integer> set = new HashSet<Integer>(); + Vector v[] = new Vector[3]; + for (ClusterWritable clusterWritable : + new SequenceFileValueIterable<ClusterWritable>( + new Path(output, "part-eigenSeed"), true, conf)) { + Cluster cluster = clusterWritable.getValue(); + int id = cluster.getId(); + assertTrue(set.add(id)); // validate unique id's + v[id] = cluster.getCenter(); + clusterCount++; + } + assertEquals(3, clusterCount); // validate sample count + // validate pair-wise orthogonality + assertEquals(0, v[0].dot(v[1]), 1E-10); + assertEquals(0, v[1].dot(v[2]), 1E-10); + assertEquals(0, v[0].dot(v[2]), 1E-10); + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/test/java/org/apache/mahout/clustering/streaming/cluster/BallKMeansTest.java ---------------------------------------------------------------------- diff --git a/mr/src/test/java/org/apache/mahout/clustering/streaming/cluster/BallKMeansTest.java b/mr/src/test/java/org/apache/mahout/clustering/streaming/cluster/BallKMeansTest.java new file mode 100644 index 0000000..340ca8e --- /dev/null +++ b/mr/src/test/java/org/apache/mahout/clustering/streaming/cluster/BallKMeansTest.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.streaming.cluster; + +import java.util.List; + +import com.google.common.collect.Lists; +import org.apache.mahout.clustering.ClusteringUtils; +import org.apache.mahout.common.Pair; +import org.apache.mahout.common.RandomUtils; +import org.apache.mahout.common.distance.EuclideanDistanceMeasure; +import org.apache.mahout.common.distance.SquaredEuclideanDistanceMeasure; +import org.apache.mahout.math.Centroid; +import org.apache.mahout.math.ConstantVector; +import org.apache.mahout.math.DenseMatrix; +import org.apache.mahout.math.DenseVector; +import org.apache.mahout.math.Matrix; +import org.apache.mahout.math.SingularValueDecomposition; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.WeightedVector; +import org.apache.mahout.math.function.Functions; +import org.apache.mahout.math.function.VectorFunction; +import org.apache.mahout.math.neighborhood.BruteSearch; +import org.apache.mahout.math.neighborhood.Searcher; +import org.apache.mahout.math.neighborhood.UpdatableSearcher; +import org.apache.mahout.math.random.MultiNormal; +import org.apache.mahout.math.random.WeightedThing; +import org.apache.mahout.math.stats.OnlineSummarizer; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.apache.mahout.clustering.ClusteringUtils.totalWeight; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class BallKMeansTest { + private static final int NUM_DATA_POINTS = 10000; + private static final int NUM_DIMENSIONS = 4; + private static final int NUM_ITERATIONS = 20; + private static final double DISTRIBUTION_RADIUS = 0.01; + + @BeforeClass + public static void setUp() { + RandomUtils.useTestSeed(); + syntheticData = DataUtils.sampleMultiNormalHypercube(NUM_DIMENSIONS, NUM_DATA_POINTS, DISTRIBUTION_RADIUS); + + } + + private static Pair<List<Centroid>, List<Centroid>> syntheticData; + private static final int K1 = 100; + + + @Test + public void testClusteringMultipleRuns() { + for (int i = 1; i <= 10; ++i) { + BallKMeans clusterer = new BallKMeans(new BruteSearch(new SquaredEuclideanDistanceMeasure()), + 1 << NUM_DIMENSIONS, NUM_ITERATIONS, true, i); + clusterer.cluster(syntheticData.getFirst()); + double costKMeansPlusPlus = ClusteringUtils.totalClusterCost(syntheticData.getFirst(), clusterer); + + clusterer = new BallKMeans(new BruteSearch(new SquaredEuclideanDistanceMeasure()), + 1 << NUM_DIMENSIONS, NUM_ITERATIONS, false, i); + clusterer.cluster(syntheticData.getFirst()); + double costKMeansRandom = ClusteringUtils.totalClusterCost(syntheticData.getFirst(), clusterer); + + System.out.printf("%d runs; kmeans++: %f; random: %f\n", i, costKMeansPlusPlus, costKMeansRandom); + assertTrue("kmeans++ cost should be less than random cost", costKMeansPlusPlus < costKMeansRandom); + } + } + + @Test + public void testClustering() { + UpdatableSearcher searcher = new BruteSearch(new SquaredEuclideanDistanceMeasure()); + BallKMeans clusterer = new BallKMeans(searcher, 1 << NUM_DIMENSIONS, NUM_ITERATIONS); + + long startTime = System.currentTimeMillis(); + Pair<List<Centroid>, List<Centroid>> data = syntheticData; + clusterer.cluster(data.getFirst()); + long endTime = System.currentTimeMillis(); + + long hash = 0; + for (Centroid centroid : data.getFirst()) { + for (Vector.Element element : centroid.all()) { + hash = 31 * hash + 17 * element.index() + Double.toHexString(element.get()).hashCode(); + } + } + System.out.printf("Hash = %08x\n", hash); + + assertEquals("Total weight not preserved", totalWeight(syntheticData.getFirst()), totalWeight(clusterer), 1.0e-9); + + // Verify that each corner of the cube has a centroid very nearby. + // This is probably FALSE for large-dimensional spaces! + OnlineSummarizer summarizer = new OnlineSummarizer(); + for (Vector mean : syntheticData.getSecond()) { + WeightedThing<Vector> v = searcher.search(mean, 1).get(0); + summarizer.add(v.getWeight()); + } + assertTrue(String.format("Median weight [%f] too large [>%f]", summarizer.getMedian(), + DISTRIBUTION_RADIUS), summarizer.getMedian() < DISTRIBUTION_RADIUS); + + double clusterTime = (endTime - startTime) / 1000.0; + System.out.printf("%s\n%.2f for clustering\n%.1f us per row\n\n", + searcher.getClass().getName(), clusterTime, + clusterTime / syntheticData.getFirst().size() * 1.0e6); + + // Verify that the total weight of the centroids near each corner is correct. + double[] cornerWeights = new double[1 << NUM_DIMENSIONS]; + Searcher trueFinder = new BruteSearch(new EuclideanDistanceMeasure()); + for (Vector trueCluster : syntheticData.getSecond()) { + trueFinder.add(trueCluster); + } + for (Centroid centroid : clusterer) { + WeightedThing<Vector> closest = trueFinder.search(centroid, 1).get(0); + cornerWeights[((Centroid)closest.getValue()).getIndex()] += centroid.getWeight(); + } + int expectedNumPoints = NUM_DATA_POINTS / (1 << NUM_DIMENSIONS); + for (double v : cornerWeights) { + System.out.printf("%f ", v); + } + System.out.println(); + for (double v : cornerWeights) { + assertEquals(expectedNumPoints, v, 0); + } + } + + @Test + public void testInitialization() { + // Start with super clusterable data. + List<? extends WeightedVector> data = cubishTestData(0.01); + + // Just do initialization of ball k-means. This should drop a point into each of the clusters. + BallKMeans r = new BallKMeans(new BruteSearch(new SquaredEuclideanDistanceMeasure()), 6, 20); + r.cluster(data); + + // Put the centroids into a matrix. + Matrix x = new DenseMatrix(6, 5); + int row = 0; + for (Centroid c : r) { + x.viewRow(row).assign(c.viewPart(0, 5)); + row++; + } + + // Verify that each column looks right. Should contain zeros except for a single 6. + final Vector columnNorms = x.aggregateColumns(new VectorFunction() { + @Override + public double apply(Vector f) { + // Return the sum of three discrepancy measures. + return Math.abs(f.minValue()) + Math.abs(f.maxValue() - 6) + Math.abs(f.norm(1) - 6); + } + }); + // Verify all errors are nearly zero. + assertEquals(0, columnNorms.norm(1) / columnNorms.size(), 0.1); + + // Verify that the centroids are a permutation of the original ones. + SingularValueDecomposition svd = new SingularValueDecomposition(x); + Vector s = svd.getS().viewDiagonal().assign(Functions.div(6)); + assertEquals(5, s.getLengthSquared(), 0.05); + assertEquals(5, s.norm(1), 0.05); + } + + private static List<? extends WeightedVector> cubishTestData(double radius) { + List<WeightedVector> data = Lists.newArrayListWithCapacity(K1 + 5000); + int row = 0; + + MultiNormal g = new MultiNormal(radius, new ConstantVector(0, 10)); + for (int i = 0; i < K1; i++) { + data.add(new WeightedVector(g.sample(), 1, row++)); + } + + for (int i = 0; i < 5; i++) { + Vector m = new DenseVector(10); + m.set(i, 6); // This was originally i == 0 ? 6 : 6 which can't be right + MultiNormal gx = new MultiNormal(radius, m); + for (int j = 0; j < 1000; j++) { + data.add(new WeightedVector(gx.sample(), 1, row++)); + } + } + return data; + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/test/java/org/apache/mahout/clustering/streaming/cluster/DataUtils.java ---------------------------------------------------------------------- diff --git a/mr/src/test/java/org/apache/mahout/clustering/streaming/cluster/DataUtils.java b/mr/src/test/java/org/apache/mahout/clustering/streaming/cluster/DataUtils.java new file mode 100644 index 0000000..2257541 --- /dev/null +++ b/mr/src/test/java/org/apache/mahout/clustering/streaming/cluster/DataUtils.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.streaming.cluster; + +import java.util.List; + +import com.google.common.collect.Lists; +import org.apache.mahout.common.Pair; +import org.apache.mahout.math.Centroid; +import org.apache.mahout.math.DenseVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.random.MultiNormal; + +/** + * A collection of miscellaneous utility functions for working with data to be clustered. + * Includes methods for generating synthetic data and estimating distance cutoff. + */ +public final class DataUtils { + private DataUtils() { + } + + /** + * Samples numDatapoints vectors of numDimensions cardinality centered around the vertices of a + * numDimensions order hypercube. The distribution of points around these vertices is + * multinormal with a radius of distributionRadius. + * A hypercube of numDimensions has 2^numDimensions vertices. Keep this in mind when clustering + * the data. + * + * Note that it is almost always the case that you want to call RandomUtils.useTestSeed() before + * generating test data. This means that you can't generate data in the declaration of a static + * variable because such initializations happen before any @BeforeClass or @Before setup methods + * are called. + * + * + * @param numDimensions number of dimensions of the vectors to be generated. + * @param numDatapoints number of data points to be generated. + * @param distributionRadius radius of the distribution around the hypercube vertices. + * @return a pair of lists, whose first element is the sampled points and whose second element + * is the list of hypercube vertices that are the means of each distribution. + */ + public static Pair<List<Centroid>, List<Centroid>> sampleMultiNormalHypercube( + int numDimensions, int numDatapoints, double distributionRadius) { + int pow2N = 1 << numDimensions; + // Construct data samplers centered on the corners of a unit hypercube. + // Additionally, keep the means of the distributions that will be generated so we can compare + // these to the ideal cluster centers. + List<Centroid> mean = Lists.newArrayListWithCapacity(pow2N); + List<MultiNormal> rowSamplers = Lists.newArrayList(); + for (int i = 0; i < pow2N; i++) { + Vector v = new DenseVector(numDimensions); + // Select each of the num + int pow2J = 1 << (numDimensions - 1); + for (int j = 0; j < numDimensions; ++j) { + v.set(j, 1.0 / pow2J * (i & pow2J)); + pow2J >>= 1; + } + mean.add(new Centroid(i, v, 1)); + rowSamplers.add(new MultiNormal(distributionRadius, v)); + } + + // Sample the requested number of data points. + List<Centroid> data = Lists.newArrayListWithCapacity(numDatapoints); + for (int i = 0; i < numDatapoints; ++i) { + data.add(new Centroid(i, rowSamplers.get(i % pow2N).sample(), 1)); + } + return new Pair<List<Centroid>, List<Centroid>>(data, mean); + } + + /** + * Calls sampleMultinormalHypercube(numDimension, numDataPoints, 0.01). + * @see DataUtils#sampleMultiNormalHypercube(int, int, double) + */ + public static Pair<List<Centroid>, List<Centroid>> sampleMultiNormalHypercube(int numDimensions, + int numDatapoints) { + return sampleMultiNormalHypercube(numDimensions, numDatapoints, 0.01); + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/test/java/org/apache/mahout/clustering/streaming/cluster/StreamingKMeansTest.java ---------------------------------------------------------------------- diff --git a/mr/src/test/java/org/apache/mahout/clustering/streaming/cluster/StreamingKMeansTest.java b/mr/src/test/java/org/apache/mahout/clustering/streaming/cluster/StreamingKMeansTest.java new file mode 100644 index 0000000..cf9263c --- /dev/null +++ b/mr/src/test/java/org/apache/mahout/clustering/streaming/cluster/StreamingKMeansTest.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.streaming.cluster; + + +import java.util.Arrays; +import java.util.List; + +import org.apache.mahout.clustering.ClusteringUtils; +import org.apache.mahout.common.Pair; +import org.apache.mahout.common.RandomUtils; +import org.apache.mahout.common.distance.EuclideanDistanceMeasure; +import org.apache.mahout.common.distance.SquaredEuclideanDistanceMeasure; +import org.apache.mahout.math.Centroid; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.neighborhood.BruteSearch; +import org.apache.mahout.math.neighborhood.FastProjectionSearch; +import org.apache.mahout.math.neighborhood.ProjectionSearch; +import org.apache.mahout.math.neighborhood.Searcher; +import org.apache.mahout.math.neighborhood.UpdatableSearcher; +import org.apache.mahout.math.random.WeightedThing; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.runners.Parameterized.Parameters; + + +@RunWith(Parameterized.class) +public class StreamingKMeansTest { + private static final int NUM_DATA_POINTS = 1 << 16; + private static final int NUM_DIMENSIONS = 6; + private static final int NUM_PROJECTIONS = 2; + private static final int SEARCH_SIZE = 10; + + private static Pair<List<Centroid>, List<Centroid>> syntheticData ; + + @Before + public void setUp() { + RandomUtils.useTestSeed(); + syntheticData = + DataUtils.sampleMultiNormalHypercube(NUM_DIMENSIONS, NUM_DATA_POINTS); + } + + private UpdatableSearcher searcher; + private boolean allAtOnce; + + public StreamingKMeansTest(UpdatableSearcher searcher, boolean allAtOnce) { + this.searcher = searcher; + this.allAtOnce = allAtOnce; + } + + @Parameters + public static List<Object[]> generateData() { + return Arrays.asList(new Object[][] { + {new ProjectionSearch(new SquaredEuclideanDistanceMeasure(), NUM_PROJECTIONS, SEARCH_SIZE), true}, + {new FastProjectionSearch(new SquaredEuclideanDistanceMeasure(), NUM_PROJECTIONS, SEARCH_SIZE), + true}, + {new ProjectionSearch(new SquaredEuclideanDistanceMeasure(), NUM_PROJECTIONS, SEARCH_SIZE), false}, + {new FastProjectionSearch(new SquaredEuclideanDistanceMeasure(), NUM_PROJECTIONS, SEARCH_SIZE), + false}, + }); + } + + @Test + public void testAverageDistanceCutoff() { + double avgDistanceCutoff = 0; + double avgNumClusters = 0; + int numTests = 1; + System.out.printf("Distance cutoff for %s\n", searcher.getClass().getName()); + for (int i = 0; i < numTests; ++i) { + searcher.clear(); + int numStreamingClusters = (int)Math.log(syntheticData.getFirst().size()) * (1 << + NUM_DIMENSIONS); + double distanceCutoff = 1.0e-6; + double estimatedCutoff = ClusteringUtils.estimateDistanceCutoff(syntheticData.getFirst(), + searcher.getDistanceMeasure(), 100); + System.out.printf("[%d] Generated synthetic data [magic] %f [estimate] %f\n", i, distanceCutoff, estimatedCutoff); + StreamingKMeans clusterer = + new StreamingKMeans(searcher, numStreamingClusters, estimatedCutoff); + clusterer.cluster(syntheticData.getFirst()); + avgDistanceCutoff += clusterer.getDistanceCutoff(); + avgNumClusters += clusterer.getNumClusters(); + System.out.printf("[%d] %f\n", i, clusterer.getDistanceCutoff()); + } + avgDistanceCutoff /= numTests; + avgNumClusters /= numTests; + System.out.printf("Final: distanceCutoff: %f estNumClusters: %f\n", avgDistanceCutoff, avgNumClusters); + } + + @Test + public void testClustering() { + searcher.clear(); + int numStreamingClusters = (int)Math.log(syntheticData.getFirst().size()) * (1 << NUM_DIMENSIONS); + System.out.printf("k log n = %d\n", numStreamingClusters); + double estimatedCutoff = ClusteringUtils.estimateDistanceCutoff(syntheticData.getFirst(), + searcher.getDistanceMeasure(), 100); + StreamingKMeans clusterer = + new StreamingKMeans(searcher, numStreamingClusters, estimatedCutoff); + + long startTime = System.currentTimeMillis(); + if (allAtOnce) { + clusterer.cluster(syntheticData.getFirst()); + } else { + for (Centroid datapoint : syntheticData.getFirst()) { + clusterer.cluster(datapoint); + } + } + long endTime = System.currentTimeMillis(); + + System.out.printf("%s %s\n", searcher.getClass().getName(), searcher.getDistanceMeasure() + .getClass().getName()); + System.out.printf("Total number of clusters %d\n", clusterer.getNumClusters()); + + System.out.printf("Weights: %f %f\n", ClusteringUtils.totalWeight(syntheticData.getFirst()), + ClusteringUtils.totalWeight(clusterer)); + assertEquals("Total weight not preserved", ClusteringUtils.totalWeight(syntheticData.getFirst()), + ClusteringUtils.totalWeight(clusterer), 1.0e-9); + + // and verify that each corner of the cube has a centroid very nearby + double maxWeight = 0; + for (Vector mean : syntheticData.getSecond()) { + WeightedThing<Vector> v = searcher.search(mean, 1).get(0); + maxWeight = Math.max(v.getWeight(), maxWeight); + } + assertTrue("Maximum weight too large " + maxWeight, maxWeight < 0.05); + double clusterTime = (endTime - startTime) / 1000.0; + System.out.printf("%s\n%.2f for clustering\n%.1f us per row\n\n", + searcher.getClass().getName(), clusterTime, + clusterTime / syntheticData.getFirst().size() * 1.0e6); + + // verify that the total weight of the centroids near each corner is correct + double[] cornerWeights = new double[1 << NUM_DIMENSIONS]; + Searcher trueFinder = new BruteSearch(new EuclideanDistanceMeasure()); + for (Vector trueCluster : syntheticData.getSecond()) { + trueFinder.add(trueCluster); + } + for (Centroid centroid : clusterer) { + WeightedThing<Vector> closest = trueFinder.search(centroid, 1).get(0); + cornerWeights[((Centroid)closest.getValue()).getIndex()] += centroid.getWeight(); + } + int expectedNumPoints = NUM_DATA_POINTS / (1 << NUM_DIMENSIONS); + for (double v : cornerWeights) { + System.out.printf("%f ", v); + } + System.out.println(); + for (double v : cornerWeights) { + assertEquals(expectedNumPoints, v, 0); + } + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/test/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansTestMR.java ---------------------------------------------------------------------- diff --git a/mr/src/test/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansTestMR.java b/mr/src/test/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansTestMR.java new file mode 100644 index 0000000..9b582b4 --- /dev/null +++ b/mr/src/test/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansTestMR.java @@ -0,0 +1,283 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.streaming.mapreduce; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mrunit.mapreduce.MapDriver; +import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver; +import org.apache.hadoop.mrunit.mapreduce.ReduceDriver; +import org.apache.mahout.clustering.ClusteringUtils; +import org.apache.mahout.clustering.streaming.cluster.DataUtils; +import org.apache.mahout.clustering.streaming.cluster.StreamingKMeans; +import org.apache.mahout.common.MahoutTestCase; +import org.apache.mahout.common.Pair; +import org.apache.mahout.common.RandomUtils; +import org.apache.mahout.common.commandline.DefaultOptionCreator; +import org.apache.mahout.common.distance.SquaredEuclideanDistanceMeasure; +import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable; +import org.apache.mahout.math.Centroid; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; +import org.apache.mahout.math.neighborhood.BruteSearch; +import org.apache.mahout.math.neighborhood.FastProjectionSearch; +import org.apache.mahout.math.neighborhood.LocalitySensitiveHashSearch; +import org.apache.mahout.math.neighborhood.ProjectionSearch; +import org.apache.mahout.math.random.WeightedThing; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +@RunWith(Parameterized.class) +public class StreamingKMeansTestMR extends MahoutTestCase { + private static final int NUM_DATA_POINTS = 1 << 15; + private static final int NUM_DIMENSIONS = 8; + private static final int NUM_PROJECTIONS = 3; + private static final int SEARCH_SIZE = 5; + private static final int MAX_NUM_ITERATIONS = 10; + private static final double DISTANCE_CUTOFF = 1.0e-6; + + private static Pair<List<Centroid>, List<Centroid>> syntheticData; + + @Before + public void setUp() { + RandomUtils.useTestSeed(); + syntheticData = + DataUtils.sampleMultiNormalHypercube(NUM_DIMENSIONS, NUM_DATA_POINTS, 1.0e-4); + } + + private final String searcherClassName; + private final String distanceMeasureClassName; + + public StreamingKMeansTestMR(String searcherClassName, String distanceMeasureClassName) { + this.searcherClassName = searcherClassName; + this.distanceMeasureClassName = distanceMeasureClassName; + } + + private void configure(Configuration configuration) { + configuration.set(DefaultOptionCreator.DISTANCE_MEASURE_OPTION, distanceMeasureClassName); + configuration.setInt(StreamingKMeansDriver.SEARCH_SIZE_OPTION, SEARCH_SIZE); + configuration.setInt(StreamingKMeansDriver.NUM_PROJECTIONS_OPTION, NUM_PROJECTIONS); + configuration.set(StreamingKMeansDriver.SEARCHER_CLASS_OPTION, searcherClassName); + configuration.setInt(DefaultOptionCreator.NUM_CLUSTERS_OPTION, 1 << NUM_DIMENSIONS); + configuration.setInt(StreamingKMeansDriver.ESTIMATED_NUM_MAP_CLUSTERS, + (1 << NUM_DIMENSIONS) * (int)Math.log(NUM_DATA_POINTS)); + configuration.setFloat(StreamingKMeansDriver.ESTIMATED_DISTANCE_CUTOFF, (float) DISTANCE_CUTOFF); + configuration.setInt(StreamingKMeansDriver.MAX_NUM_ITERATIONS, MAX_NUM_ITERATIONS); + + // Collapse the Centroids in the reducer. + configuration.setBoolean(StreamingKMeansDriver.REDUCE_STREAMING_KMEANS, true); + } + + @Parameterized.Parameters + public static List<Object[]> generateData() { + return Arrays.asList(new Object[][]{ + {ProjectionSearch.class.getName(), SquaredEuclideanDistanceMeasure.class.getName()}, + {FastProjectionSearch.class.getName(), SquaredEuclideanDistanceMeasure.class.getName()}, + {LocalitySensitiveHashSearch.class.getName(), SquaredEuclideanDistanceMeasure.class.getName()}, + }); + } + + @Test + public void testHypercubeMapper() throws IOException { + MapDriver<Writable, VectorWritable, IntWritable, CentroidWritable> mapDriver = + MapDriver.newMapDriver(new StreamingKMeansMapper()); + configure(mapDriver.getConfiguration()); + System.out.printf("%s mapper test\n", + mapDriver.getConfiguration().get(StreamingKMeansDriver.SEARCHER_CLASS_OPTION)); + for (Centroid datapoint : syntheticData.getFirst()) { + mapDriver.addInput(new IntWritable(0), new VectorWritable(datapoint)); + } + List<org.apache.hadoop.mrunit.types.Pair<IntWritable,CentroidWritable>> results = mapDriver.run(); + BruteSearch resultSearcher = new BruteSearch(new SquaredEuclideanDistanceMeasure()); + for (org.apache.hadoop.mrunit.types.Pair<IntWritable, CentroidWritable> result : results) { + resultSearcher.add(result.getSecond().getCentroid()); + } + System.out.printf("Clustered the data into %d clusters\n", results.size()); + for (Vector mean : syntheticData.getSecond()) { + WeightedThing<Vector> closest = resultSearcher.search(mean, 1).get(0); + assertTrue("Weight " + closest.getWeight() + " not less than 0.5", closest.getWeight() < 0.5); + } + } + + @Test + public void testMapperVsLocal() throws IOException { + // Clusters the data using the StreamingKMeansMapper. + MapDriver<Writable, VectorWritable, IntWritable, CentroidWritable> mapDriver = + MapDriver.newMapDriver(new StreamingKMeansMapper()); + Configuration configuration = mapDriver.getConfiguration(); + configure(configuration); + System.out.printf("%s mapper vs local test\n", + mapDriver.getConfiguration().get(StreamingKMeansDriver.SEARCHER_CLASS_OPTION)); + + for (Centroid datapoint : syntheticData.getFirst()) { + mapDriver.addInput(new IntWritable(0), new VectorWritable(datapoint)); + } + List<Centroid> mapperCentroids = Lists.newArrayList(); + for (org.apache.hadoop.mrunit.types.Pair<IntWritable, CentroidWritable> pair : mapDriver.run()) { + mapperCentroids.add(pair.getSecond().getCentroid()); + } + + // Clusters the data using local batch StreamingKMeans. + StreamingKMeans batchClusterer = + new StreamingKMeans(StreamingKMeansUtilsMR.searcherFromConfiguration(configuration), + mapDriver.getConfiguration().getInt("estimatedNumMapClusters", -1), DISTANCE_CUTOFF); + batchClusterer.cluster(syntheticData.getFirst()); + List<Centroid> batchCentroids = Lists.newArrayList(); + for (Vector v : batchClusterer) { + batchCentroids.add((Centroid) v); + } + + // Clusters the data using point by point StreamingKMeans. + StreamingKMeans perPointClusterer = + new StreamingKMeans(StreamingKMeansUtilsMR.searcherFromConfiguration(configuration), + (1 << NUM_DIMENSIONS) * (int)Math.log(NUM_DATA_POINTS), DISTANCE_CUTOFF); + for (Centroid datapoint : syntheticData.getFirst()) { + perPointClusterer.cluster(datapoint); + } + List<Centroid> perPointCentroids = Lists.newArrayList(); + for (Vector v : perPointClusterer) { + perPointCentroids.add((Centroid) v); + } + + // Computes the cost (total sum of distances) of these different clusterings. + double mapperCost = ClusteringUtils.totalClusterCost(syntheticData.getFirst(), mapperCentroids); + double localCost = ClusteringUtils.totalClusterCost(syntheticData.getFirst(), batchCentroids); + double perPointCost = ClusteringUtils.totalClusterCost(syntheticData.getFirst(), perPointCentroids); + System.out.printf("[Total cost] Mapper %f [%d] Local %f [%d] Perpoint local %f [%d];" + + "[ratio m-vs-l %f] [ratio pp-vs-l %f]\n", mapperCost, mapperCentroids.size(), + localCost, batchCentroids.size(), perPointCost, perPointCentroids.size(), + mapperCost / localCost, perPointCost / localCost); + + // These ratios should be close to 1.0 and have been observed to be go as low as 0.6 and as low as 1.5. + // A buffer of [0.2, 1.8] seems appropriate. + assertEquals("Mapper StreamingKMeans / Batch local StreamingKMeans total cost ratio too far from 1", + 1.0, mapperCost / localCost, 0.8); + assertEquals("One by one local StreamingKMeans / Batch local StreamingKMeans total cost ratio too high", + 1.0, perPointCost / localCost, 0.8); + } + + @Test + public void testHypercubeReducer() throws IOException { + ReduceDriver<IntWritable, CentroidWritable, IntWritable, CentroidWritable> reduceDriver = + ReduceDriver.newReduceDriver(new StreamingKMeansReducer()); + Configuration configuration = reduceDriver.getConfiguration(); + configure(configuration); + + System.out.printf("%s reducer test\n", configuration.get(StreamingKMeansDriver.SEARCHER_CLASS_OPTION)); + StreamingKMeans clusterer = + new StreamingKMeans(StreamingKMeansUtilsMR .searcherFromConfiguration(configuration), + (1 << NUM_DIMENSIONS) * (int)Math.log(NUM_DATA_POINTS), DISTANCE_CUTOFF); + + long start = System.currentTimeMillis(); + clusterer.cluster(syntheticData.getFirst()); + long end = System.currentTimeMillis(); + + System.out.printf("%f [s]\n", (end - start) / 1000.0); + List<CentroidWritable> reducerInputs = Lists.newArrayList(); + int postMapperTotalWeight = 0; + for (Centroid intermediateCentroid : clusterer) { + reducerInputs.add(new CentroidWritable(intermediateCentroid)); + postMapperTotalWeight += intermediateCentroid.getWeight(); + } + + reduceDriver.addInput(new IntWritable(0), reducerInputs); + List<org.apache.hadoop.mrunit.types.Pair<IntWritable, CentroidWritable>> results = + reduceDriver.run(); + testReducerResults(postMapperTotalWeight, results); + } + + @Test + public void testHypercubeMapReduce() throws IOException { + MapReduceDriver<Writable, VectorWritable, IntWritable, CentroidWritable, IntWritable, CentroidWritable> + mapReduceDriver = new MapReduceDriver<Writable, VectorWritable, IntWritable, CentroidWritable, + IntWritable, CentroidWritable>(new StreamingKMeansMapper(), new StreamingKMeansReducer()); + Configuration configuration = mapReduceDriver.getConfiguration(); + configure(configuration); + + System.out.printf("%s full test\n", configuration.get(StreamingKMeansDriver.SEARCHER_CLASS_OPTION)); + for (Centroid datapoint : syntheticData.getFirst()) { + mapReduceDriver.addInput(new IntWritable(0), new VectorWritable(datapoint)); + } + List<org.apache.hadoop.mrunit.types.Pair<IntWritable, CentroidWritable>> results = mapReduceDriver.run(); + testReducerResults(syntheticData.getFirst().size(), results); + } + + @Test + public void testHypercubeMapReduceRunSequentially() throws Exception { + Configuration configuration = getConfiguration(); + configure(configuration); + configuration.set(DefaultOptionCreator.METHOD_OPTION, DefaultOptionCreator.SEQUENTIAL_METHOD); + + Path inputPath = new Path("testInput"); + Path outputPath = new Path("testOutput"); + StreamingKMeansUtilsMR.writeVectorsToSequenceFile(syntheticData.getFirst(), inputPath, configuration); + + StreamingKMeansDriver.run(configuration, inputPath, outputPath); + + testReducerResults(syntheticData.getFirst().size(), + Lists.newArrayList(Iterables.transform( + new SequenceFileIterable<IntWritable, CentroidWritable>(outputPath, configuration), + new Function< + Pair<IntWritable, CentroidWritable>, + org.apache.hadoop.mrunit.types.Pair<IntWritable, CentroidWritable>>() { + @Override + public org.apache.hadoop.mrunit.types.Pair<IntWritable, CentroidWritable> apply( + org.apache.mahout.common.Pair<IntWritable, CentroidWritable> input) { + return new org.apache.hadoop.mrunit.types.Pair<IntWritable, CentroidWritable>( + input.getFirst(), input.getSecond()); + } + }))); + } + + private static void testReducerResults(int totalWeight, List<org.apache.hadoop.mrunit.types.Pair<IntWritable, + CentroidWritable>> results) { + int expectedNumClusters = 1 << NUM_DIMENSIONS; + double expectedWeight = (double) totalWeight / expectedNumClusters; + int numClusters = 0; + int numUnbalancedClusters = 0; + int totalReducerWeight = 0; + for (org.apache.hadoop.mrunit.types.Pair<IntWritable, CentroidWritable> result : results) { + if (result.getSecond().getCentroid().getWeight() != expectedWeight) { + System.out.printf("Unbalanced weight %f in centroid %d\n", result.getSecond().getCentroid().getWeight(), + result.getSecond().getCentroid().getIndex()); + ++numUnbalancedClusters; + } + assertEquals("Final centroid index is invalid", numClusters, result.getFirst().get()); + totalReducerWeight += result.getSecond().getCentroid().getWeight(); + ++numClusters; + } + System.out.printf("%d clusters are unbalanced\n", numUnbalancedClusters); + assertEquals("Invalid total weight", totalWeight, totalReducerWeight); + assertEquals("Invalid number of clusters", 1 << NUM_DIMENSIONS, numClusters); + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/test/java/org/apache/mahout/clustering/streaming/tools/ResplitSequenceFilesTest.java ---------------------------------------------------------------------- diff --git a/mr/src/test/java/org/apache/mahout/clustering/streaming/tools/ResplitSequenceFilesTest.java b/mr/src/test/java/org/apache/mahout/clustering/streaming/tools/ResplitSequenceFilesTest.java new file mode 100644 index 0000000..2d790e5 --- /dev/null +++ b/mr/src/test/java/org/apache/mahout/clustering/streaming/tools/ResplitSequenceFilesTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.streaming.tools; + +import com.google.common.collect.Iterables; +import com.google.common.io.Closeables; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.mahout.common.HadoopUtil; +import org.apache.mahout.common.MahoutTestCase; +import org.apache.mahout.common.iterator.sequencefile.PathFilters; +import org.apache.mahout.common.iterator.sequencefile.PathType; +import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable; +import org.junit.Test; + +public class ResplitSequenceFilesTest extends MahoutTestCase { + + @Test + public void testSplitting() throws Exception { + + Path inputFile = new Path(getTestTempDirPath("input"), "test.seq"); + Path output = getTestTempDirPath("output"); + Configuration conf = new Configuration(); + LocalFileSystem fs = FileSystem.getLocal(conf); + + SequenceFile.Writer writer = null; + try { + writer = SequenceFile.createWriter(fs, conf, inputFile, IntWritable.class, IntWritable.class); + writer.append(new IntWritable(1), new IntWritable(1)); + writer.append(new IntWritable(2), new IntWritable(2)); + writer.append(new IntWritable(3), new IntWritable(3)); + writer.append(new IntWritable(4), new IntWritable(4)); + writer.append(new IntWritable(5), new IntWritable(5)); + writer.append(new IntWritable(6), new IntWritable(6)); + writer.append(new IntWritable(7), new IntWritable(7)); + writer.append(new IntWritable(8), new IntWritable(8)); + } finally { + Closeables.close(writer, false); + } + + String splitPattern = "split"; + int numSplits = 4; + + ResplitSequenceFiles.main(new String[] { "--input", inputFile.toString(), + "--output", output.toString() + "/" + splitPattern, "--numSplits", String.valueOf(numSplits) }); + + FileStatus[] statuses = HadoopUtil.getFileStatus(output, PathType.LIST, PathFilters.logsCRCFilter(), null, conf); + + for (FileStatus status : statuses) { + String name = status.getPath().getName(); + assertTrue(name.startsWith(splitPattern)); + assertEquals(2, numEntries(status, conf)); + } + assertEquals(numSplits, statuses.length); + } + + private int numEntries(FileStatus status, Configuration conf) { + return Iterables.size(new SequenceFileIterable(status.getPath(), conf)); + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/test/java/org/apache/mahout/clustering/topdown/PathDirectoryTest.java ---------------------------------------------------------------------- diff --git a/mr/src/test/java/org/apache/mahout/clustering/topdown/PathDirectoryTest.java b/mr/src/test/java/org/apache/mahout/clustering/topdown/PathDirectoryTest.java new file mode 100644 index 0000000..66b66e3 --- /dev/null +++ b/mr/src/test/java/org/apache/mahout/clustering/topdown/PathDirectoryTest.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.mahout.clustering.topdown; + +import org.apache.hadoop.fs.Path; +import org.apache.mahout.common.MahoutTestCase; +import org.junit.Test; + +import java.io.File; + +public final class PathDirectoryTest extends MahoutTestCase { + + private final Path output = new Path("output"); + + @Test + public void shouldReturnTopLevelClusterPath() { + Path expectedPath = new Path(output, PathDirectory.TOP_LEVEL_CLUSTER_DIRECTORY); + assertEquals(expectedPath, PathDirectory.getTopLevelClusterPath(output)); + } + + @Test + public void shouldReturnClusterPostProcessorOutputDirectory() { + Path expectedPath = new Path(output, PathDirectory.POST_PROCESS_DIRECTORY); + assertEquals(expectedPath, PathDirectory.getClusterPostProcessorOutputDirectory(output)); + } + + @Test + public void shouldReturnClusterOutputClusteredPoints() { + Path expectedPath = new Path(output, PathDirectory.CLUSTERED_POINTS_DIRECTORY + File.separator + '*'); + assertEquals(expectedPath, PathDirectory.getClusterOutputClusteredPoints(output)); + } + + @Test + public void shouldReturnBottomLevelClusterPath() { + Path expectedPath = new Path(output + File.separator + + PathDirectory.BOTTOM_LEVEL_CLUSTER_DIRECTORY + File.separator + + '1'); + assertEquals(expectedPath, PathDirectory.getBottomLevelClusterPath(output, "1")); + } + + @Test + public void shouldReturnClusterPathForClusterId() { + Path expectedPath = new Path(PathDirectory.getClusterPostProcessorOutputDirectory(output), new Path("1")); + assertEquals(expectedPath, PathDirectory.getClusterPathForClusterId( + PathDirectory.getClusterPostProcessorOutputDirectory(output), "1")); + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/test/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterCountReaderTest.java ---------------------------------------------------------------------- diff --git a/mr/src/test/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterCountReaderTest.java b/mr/src/test/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterCountReaderTest.java new file mode 100644 index 0000000..0934ff7 --- /dev/null +++ b/mr/src/test/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterCountReaderTest.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.topdown.postprocessor; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.mahout.clustering.Cluster; +import org.apache.mahout.clustering.ClusteringTestUtils; +import org.apache.mahout.clustering.canopy.CanopyDriver; +import org.apache.mahout.clustering.classify.WeightedVectorWritable; +import org.apache.mahout.clustering.kmeans.KMeansDriver; +import org.apache.mahout.common.DummyOutputCollector; +import org.apache.mahout.common.MahoutTestCase; +import org.apache.mahout.common.Pair; +import org.apache.mahout.common.distance.DistanceMeasure; +import org.apache.mahout.common.distance.ManhattanDistanceMeasure; +import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable; +import org.apache.mahout.math.RandomAccessSparseVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.Lists; + +public final class ClusterCountReaderTest extends MahoutTestCase { + + public static final double[][] REFERENCE = { {1, 1}, {2, 1}, {1, 2}, {4, 4}, {5, 4}, {4, 5}, {5, 5}}; + + private FileSystem fs; + private Path outputPathForCanopy; + private Path outputPathForKMeans; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + Configuration conf = getConfiguration(); + fs = FileSystem.get(conf); + } + + public static List<VectorWritable> getPointsWritable(double[][] raw) { + List<VectorWritable> points = Lists.newArrayList(); + for (double[] fr : raw) { + Vector vec = new RandomAccessSparseVector(fr.length); + vec.assign(fr); + points.add(new VectorWritable(vec)); + } + return points; + } + + /** + * Story: User wants to use cluster post processor after canopy clustering and then run clustering on the + * output clusters + */ + @Test + public void testGetNumberOfClusters() throws Exception { + List<VectorWritable> points = getPointsWritable(REFERENCE); + + Path pointsPath = getTestTempDirPath("points"); + Configuration conf = getConfiguration(); + ClusteringTestUtils.writePointsToFile(points, new Path(pointsPath, "file1"), fs, conf); + ClusteringTestUtils.writePointsToFile(points, new Path(pointsPath, "file2"), fs, conf); + + outputPathForCanopy = getTestTempDirPath("canopy"); + outputPathForKMeans = getTestTempDirPath("kmeans"); + + topLevelClustering(pointsPath, conf); + + int numberOfClusters = ClusterCountReader.getNumberOfClusters(outputPathForKMeans, conf); + Assert.assertEquals(2, numberOfClusters); + verifyThatNumberOfClustersIsCorrect(conf, new Path(outputPathForKMeans, new Path("clusteredPoints"))); + + } + + private void topLevelClustering(Path pointsPath, Configuration conf) throws IOException, + InterruptedException, + ClassNotFoundException { + DistanceMeasure measure = new ManhattanDistanceMeasure(); + CanopyDriver.run(conf, pointsPath, outputPathForCanopy, measure, 4.0, 3.0, true, 0.0, true); + Path clustersIn = new Path(outputPathForCanopy, new Path(Cluster.CLUSTERS_DIR + '0' + + Cluster.FINAL_ITERATION_SUFFIX)); + KMeansDriver.run(conf, pointsPath, clustersIn, outputPathForKMeans, 1, 1, true, 0.0, true); + } + + private static void verifyThatNumberOfClustersIsCorrect(Configuration conf, Path clusteredPointsPath) { + DummyOutputCollector<IntWritable,WeightedVectorWritable> collector = + new DummyOutputCollector<IntWritable,WeightedVectorWritable>(); + + // The key is the clusterId, the value is the weighted vector + for (Pair<IntWritable,WeightedVectorWritable> record : + new SequenceFileIterable<IntWritable,WeightedVectorWritable>(new Path(clusteredPointsPath, "part-m-0"), + conf)) { + collector.collect(record.getFirst(), record.getSecond()); + } + int clusterSize = collector.getKeys().size(); + assertEquals(2, clusterSize); + } + +}
