http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/test/java/org/apache/mahout/clustering/lda/cvb/TestCVBModelTrainer.java
----------------------------------------------------------------------
diff --git 
a/mr/src/test/java/org/apache/mahout/clustering/lda/cvb/TestCVBModelTrainer.java
 
b/mr/src/test/java/org/apache/mahout/clustering/lda/cvb/TestCVBModelTrainer.java
new file mode 100644
index 0000000..dd4360a
--- /dev/null
+++ 
b/mr/src/test/java/org/apache/mahout/clustering/lda/cvb/TestCVBModelTrainer.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.lda.cvb;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.mahout.clustering.ClusteringTestUtils;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.RandomUtils;
+import org.apache.mahout.math.Matrix;
+import org.apache.mahout.math.MatrixUtils;
+import org.apache.mahout.math.function.DoubleFunction;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+public final class TestCVBModelTrainer extends MahoutTestCase {
+
+  private static final double ETA = 0.1;
+  private static final double ALPHA = 0.1;
+
+  @Test
+  public void testInMemoryCVB0() throws Exception {
+    String[] terms = new String[26];
+    for (int i=0; i<terms.length; i++) {
+      terms[i] = String.valueOf((char) (i + 'a'));
+    }
+    int numGeneratingTopics = 3;
+    int numTerms = 26;
+    Matrix matrix = 
ClusteringTestUtils.randomStructuredModel(numGeneratingTopics, numTerms, new 
DoubleFunction() {
+      @Override public double apply(double d) {
+        return 1.0 / Math.pow(d + 1.0, 2);
+      }
+    });
+
+    int numDocs = 100;
+    int numSamples = 20;
+    int numTopicsPerDoc = 1;
+
+    Matrix sampledCorpus = ClusteringTestUtils.sampledCorpus(matrix, 
RandomUtils.getRandom(),
+                                                             numDocs, 
numSamples, numTopicsPerDoc);
+
+    List<Double> perplexities = Lists.newArrayList();
+    int numTrials = 1;
+    for (int numTestTopics = 1; numTestTopics < 2 * numGeneratingTopics; 
numTestTopics++) {
+      double[] perps = new double[numTrials];
+      for (int trial = 0; trial < numTrials; trial++) {
+        InMemoryCollapsedVariationalBayes0 cvb =
+          new InMemoryCollapsedVariationalBayes0(sampledCorpus, terms, 
numTestTopics, ALPHA, ETA, 2, 1, 0);
+        cvb.setVerbose(true);
+        perps[trial] = cvb.iterateUntilConvergence(0, 5, 0, 0.2);
+        System.out.println(perps[trial]);
+      }
+      Arrays.sort(perps);
+      System.out.println(Arrays.toString(perps));
+      perplexities.add(perps[0]);
+    }
+    System.out.println(Joiner.on(",").join(perplexities));
+  }
+
+  @Test
+  public void testRandomStructuredModelViaMR() throws Exception {
+    int numGeneratingTopics = 3;
+    int numTerms = 9;
+    Matrix matrix = 
ClusteringTestUtils.randomStructuredModel(numGeneratingTopics, numTerms, new 
DoubleFunction() {
+      @Override
+      public double apply(double d) {
+        return 1.0 / Math.pow(d + 1.0, 3);
+      }
+    });
+
+    int numDocs = 500;
+    int numSamples = 10;
+    int numTopicsPerDoc = 1;
+
+    Matrix sampledCorpus = ClusteringTestUtils.sampledCorpus(matrix, 
RandomUtils.getRandom(1234),
+                                                             numDocs, 
numSamples, numTopicsPerDoc);
+
+    Path sampleCorpusPath = getTestTempDirPath("corpus");
+    Configuration configuration = getConfiguration();
+    MatrixUtils.write(sampleCorpusPath, configuration, sampledCorpus);
+    int numIterations = 5;
+    List<Double> perplexities = Lists.newArrayList();
+    int startTopic = numGeneratingTopics - 1;
+    int numTestTopics = startTopic;
+    while (numTestTopics < numGeneratingTopics + 2) {
+      Path topicModelStateTempPath = getTestTempDirPath("topicTemp" + 
numTestTopics);
+      Configuration conf = getConfiguration();
+      CVB0Driver cvb0Driver = new CVB0Driver();
+      cvb0Driver.run(conf, sampleCorpusPath, null, numTestTopics, numTerms,
+          ALPHA, ETA, numIterations, 1, 0, null, null, 
topicModelStateTempPath, 1234, 0.2f, 2,
+          1, 3, 1, false);
+      perplexities.add(lowestPerplexity(conf, topicModelStateTempPath));
+      numTestTopics++;
+    }
+    int bestTopic = -1;
+    double lowestPerplexity = Double.MAX_VALUE;
+    for (int t = 0; t < perplexities.size(); t++) {
+      if (perplexities.get(t) < lowestPerplexity) {
+        lowestPerplexity = perplexities.get(t);
+        bestTopic = t + startTopic;
+      }
+    }
+    assertEquals("The optimal number of topics is not that of the generating 
distribution", 4, bestTopic);
+    System.out.println("Perplexities: " + Joiner.on(", ").join(perplexities));
+  }
+
+  private static double lowestPerplexity(Configuration conf, Path 
topicModelTemp)
+      throws IOException {
+    double lowest = Double.MAX_VALUE;
+    double current;
+    int iteration = 2;
+    while (!Double.isNaN(current = CVB0Driver.readPerplexity(conf, 
topicModelTemp, iteration))) {
+      lowest = Math.min(current, lowest);
+      iteration++;
+    }
+    return lowest;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/test/java/org/apache/mahout/clustering/spectral/TestAffinityMatrixInputJob.java
----------------------------------------------------------------------
diff --git 
a/mr/src/test/java/org/apache/mahout/clustering/spectral/TestAffinityMatrixInputJob.java
 
b/mr/src/test/java/org/apache/mahout/clustering/spectral/TestAffinityMatrixInputJob.java
new file mode 100644
index 0000000..6e0cd18
--- /dev/null
+++ 
b/mr/src/test/java/org/apache/mahout/clustering/spectral/TestAffinityMatrixInputJob.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.common.DummyRecordWriter;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.hadoop.DistributedRowMatrix.MatrixEntryWritable;
+import org.junit.Test;
+
+/**
+ * <p>Tests the affinity matrix input M/R task.</p>
+ * 
+ * <p>The tricky item with this task is that the format of the input
+ * must be correct; it must take the form of a graph input, and for the
+ * current implementation, the input must be symmetric, e.g. the weight
+ * from node A to B = the weight from node B to A. This is not explicitly
+ * enforced within the task itself (since, as of the time these tests were 
+ * written, we have not yet decided on a final rule regarding the 
+ * symmetry/non-symmetry of the affinity matrix, so we are unofficially 
+ * enforcing symmetry). Input looks something like this:</p>
+ * 
+ * <pre>0, 0, 0
+ * 0, 1, 10
+ * 0, 2, 20
+ * ...
+ * 1, 0, 10
+ * 2, 0, 20
+ * ...</pre>
+ * 
+ * <p>The mapper's task is simply to convert each line of text into a
+ * DistributedRowMatrix entry, allowing the reducer to join each entry
+ * of the same row into a VectorWritable.</p>
+ * 
+ * <p>Exceptions are thrown in cases of bad input format: if there are
+ * more or fewer than 3 numbers per line, or any of the numbers are missing.
+ */
+public class TestAffinityMatrixInputJob extends MahoutTestCase {
+  
+  private static final String [] RAW = {"0,0,0", "0,1,5", "0,2,10", "1,0,5", 
"1,1,0",
+                                        "1,2,20", "2,0,10", "2,1,20", "2,2,0"};
+  private static final int RAW_DIMENSIONS = 3;
+
+  @Test
+  public void testAffinityMatrixInputMapper() throws Exception {
+    AffinityMatrixInputMapper mapper = new AffinityMatrixInputMapper();
+    Configuration conf = getConfiguration();
+    conf.setInt(Keys.AFFINITY_DIMENSIONS, RAW_DIMENSIONS);
+    
+    // set up the dummy writer and the M/R context
+    DummyRecordWriter<IntWritable, MatrixEntryWritable> writer =
+      new DummyRecordWriter<IntWritable, MatrixEntryWritable>();
+    Mapper<LongWritable, Text, IntWritable, MatrixEntryWritable>.Context 
+      context = DummyRecordWriter.build(mapper, conf, writer);
+
+    // loop through all the points and test each one is converted
+    // successfully to a DistributedRowMatrix.MatrixEntry
+    for (String s : RAW) {
+      mapper.map(new LongWritable(), new Text(s), context);
+    }
+
+    // test the data was successfully constructed
+    assertEquals("Number of map results", RAW_DIMENSIONS, 
writer.getData().size());
+    Set<IntWritable> keys = writer.getData().keySet();
+    for (IntWritable i : keys) {
+      List<MatrixEntryWritable> row = writer.getData().get(i);
+      assertEquals("Number of items in row", RAW_DIMENSIONS, row.size());
+    }
+  }
+  
+  @Test
+  public void testAffinitymatrixInputReducer() throws Exception {
+    AffinityMatrixInputMapper mapper = new AffinityMatrixInputMapper();
+    Configuration conf = getConfiguration();
+    conf.setInt(Keys.AFFINITY_DIMENSIONS, RAW_DIMENSIONS);
+    
+    // set up the dummy writer and the M/R context
+    DummyRecordWriter<IntWritable, MatrixEntryWritable> mapWriter =
+      new DummyRecordWriter<IntWritable, MatrixEntryWritable>();
+    Mapper<LongWritable, Text, IntWritable, MatrixEntryWritable>.Context
+      mapContext = DummyRecordWriter.build(mapper, conf, mapWriter);
+
+    // loop through all the points and test each one is converted
+    // successfully to a DistributedRowMatrix.MatrixEntry
+    for (String s : RAW) {
+      mapper.map(new LongWritable(), new Text(s), mapContext);
+    }
+    // store the data for checking later
+    Map<IntWritable, List<MatrixEntryWritable>> map = mapWriter.getData();
+
+    // now reduce the data
+    AffinityMatrixInputReducer reducer = new AffinityMatrixInputReducer();
+    DummyRecordWriter<IntWritable, VectorWritable> redWriter = 
+      new DummyRecordWriter<IntWritable, VectorWritable>();
+    Reducer<IntWritable, MatrixEntryWritable,
+      IntWritable, VectorWritable>.Context redContext = DummyRecordWriter
+      .build(reducer, conf, redWriter, IntWritable.class, 
MatrixEntryWritable.class);
+    for (IntWritable key : mapWriter.getKeys()) {
+      reducer.reduce(key, mapWriter.getValue(key), redContext);
+    }
+    
+    // check that all the elements are correctly ordered
+    assertEquals("Number of reduce results", RAW_DIMENSIONS, 
redWriter.getData().size());
+    for (IntWritable row : redWriter.getKeys()) {
+      List<VectorWritable> list = redWriter.getValue(row);
+      assertEquals("Should only be one vector", 1, list.size());
+      // check that the elements in the array are correctly ordered
+      Vector v = list.get(0).get();
+      for (Vector.Element e : v.all()) {
+        // find this value in the original map
+        MatrixEntryWritable toCompare = new MatrixEntryWritable();
+        toCompare.setRow(-1);
+        toCompare.setCol(e.index());
+        toCompare.setVal(e.get());
+        assertTrue("This entry was correctly placed in its row", 
map.get(row).contains(toCompare));
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/test/java/org/apache/mahout/clustering/spectral/TestMatrixDiagonalizeJob.java
----------------------------------------------------------------------
diff --git 
a/mr/src/test/java/org/apache/mahout/clustering/spectral/TestMatrixDiagonalizeJob.java
 
b/mr/src/test/java/org/apache/mahout/clustering/spectral/TestMatrixDiagonalizeJob.java
new file mode 100644
index 0000000..7d4ec1f
--- /dev/null
+++ 
b/mr/src/test/java/org/apache/mahout/clustering/spectral/TestMatrixDiagonalizeJob.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import 
org.apache.mahout.clustering.spectral.MatrixDiagonalizeJob.MatrixDiagonalizeMapper;
+import 
org.apache.mahout.clustering.spectral.MatrixDiagonalizeJob.MatrixDiagonalizeReducer;
+import org.apache.mahout.common.DummyRecordWriter;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.junit.Test;
+
+/**
+ * <p>The MatrixDiagonalize task is pretty simple: given a matrix,
+ * it sums the elements of the row, and sticks the sum in position (i, i) 
+ * of a new matrix of identical dimensions to the original.</p>
+ */
+public class TestMatrixDiagonalizeJob extends MahoutTestCase {
+  
+  private static final double[][] RAW = { {1, 2, 3}, {4, 5, 6}, {7, 8, 9} };
+  private static final int RAW_DIMENSIONS = 3;
+  
+  private static double rowSum(double [] row) {
+    double sum = 0;
+    for (double r : row) {
+      sum += r;
+    }
+    return sum;
+  }
+
+  @Test
+  public void testMatrixDiagonalizeMapper() throws Exception {
+    MatrixDiagonalizeMapper mapper = new MatrixDiagonalizeMapper();
+    Configuration conf = getConfiguration();
+    conf.setInt(Keys.AFFINITY_DIMENSIONS, RAW_DIMENSIONS);
+    
+    // set up the dummy writers
+    DummyRecordWriter<NullWritable, IntDoublePairWritable> writer =
+      new DummyRecordWriter<NullWritable, IntDoublePairWritable>();
+    Mapper<IntWritable, VectorWritable, NullWritable, 
IntDoublePairWritable>.Context 
+      context = DummyRecordWriter.build(mapper, conf, writer);
+    
+    // perform the mapping
+    for (int i = 0; i < RAW_DIMENSIONS; i++) {
+      RandomAccessSparseVector toAdd = new 
RandomAccessSparseVector(RAW_DIMENSIONS);
+      toAdd.assign(RAW[i]);
+      mapper.map(new IntWritable(i), new VectorWritable(toAdd), context);
+    }
+    
+    // check the number of the results
+    assertEquals("Number of map results", RAW_DIMENSIONS,
+        writer.getValue(NullWritable.get()).size());
+  }
+  
+  @Test
+ public void testMatrixDiagonalizeReducer() throws Exception {
+    MatrixDiagonalizeMapper mapper = new MatrixDiagonalizeMapper();
+    Configuration conf = getConfiguration();
+    conf.setInt(Keys.AFFINITY_DIMENSIONS, RAW_DIMENSIONS);
+    
+    // set up the dummy writers
+    DummyRecordWriter<NullWritable, IntDoublePairWritable> mapWriter = 
+      new DummyRecordWriter<NullWritable, IntDoublePairWritable>();
+    Mapper<IntWritable, VectorWritable, NullWritable, 
IntDoublePairWritable>.Context 
+      mapContext = DummyRecordWriter.build(mapper, conf, mapWriter);
+    
+    // perform the mapping
+    for (int i = 0; i < RAW_DIMENSIONS; i++) {
+      RandomAccessSparseVector toAdd = new 
RandomAccessSparseVector(RAW_DIMENSIONS);
+      toAdd.assign(RAW[i]);
+      mapper.map(new IntWritable(i), new VectorWritable(toAdd), mapContext);
+    }
+    
+    // now perform the reduction
+    MatrixDiagonalizeReducer reducer = new MatrixDiagonalizeReducer();
+    DummyRecordWriter<NullWritable, VectorWritable> redWriter = new
+      DummyRecordWriter<NullWritable, VectorWritable>();
+    Reducer<NullWritable, IntDoublePairWritable, NullWritable, 
VectorWritable>.Context
+      redContext = DummyRecordWriter.build(reducer, conf, redWriter, 
+      NullWritable.class, IntDoublePairWritable.class);
+    
+    // only need one reduction
+    reducer.reduce(NullWritable.get(), mapWriter.getValue(NullWritable.get()), 
redContext);
+    
+    // first, make sure there's only one result
+    List<VectorWritable> list = redWriter.getValue(NullWritable.get());
+    assertEquals("Only a single resulting vector", 1, list.size());
+    Vector v = list.get(0).get();
+    for (int i = 0; i < v.size(); i++) {
+      assertEquals("Element sum is correct", rowSum(RAW[i]), v.get(i),0.01);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/test/java/org/apache/mahout/clustering/spectral/TestUnitVectorizerJob.java
----------------------------------------------------------------------
diff --git 
a/mr/src/test/java/org/apache/mahout/clustering/spectral/TestUnitVectorizerJob.java
 
b/mr/src/test/java/org/apache/mahout/clustering/spectral/TestUnitVectorizerJob.java
new file mode 100644
index 0000000..f317f6e
--- /dev/null
+++ 
b/mr/src/test/java/org/apache/mahout/clustering/spectral/TestUnitVectorizerJob.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import 
org.apache.mahout.clustering.spectral.UnitVectorizerJob.UnitVectorizerMapper;
+import org.apache.mahout.common.DummyRecordWriter;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.junit.Test;
+
+public class TestUnitVectorizerJob extends MahoutTestCase {
+
+  private static final double [][] RAW = { {1, 2, 3}, {4, 5, 6}, {7, 8, 9} };
+
+  @Test
+  public void testUnitVectorizerMapper() throws Exception {
+    UnitVectorizerMapper mapper = new UnitVectorizerMapper();
+    Configuration conf = getConfiguration();
+    
+    // set up the dummy writers
+    DummyRecordWriter<IntWritable, VectorWritable> writer = new
+      DummyRecordWriter<IntWritable, VectorWritable>();
+    Mapper<IntWritable, VectorWritable, IntWritable, VectorWritable>.Context 
+      context = DummyRecordWriter.build(mapper, conf, writer);
+    
+    // perform the mapping
+    for (int i = 0; i < RAW.length; i++) {
+      Vector vector = new RandomAccessSparseVector(RAW[i].length);
+      vector.assign(RAW[i]);
+      mapper.map(new IntWritable(i), new VectorWritable(vector), context);
+    }
+    
+    // check the results
+    assertEquals("Number of map results", RAW.length, writer.getData().size());
+    for (int i = 0; i < RAW.length; i++) {
+      IntWritable key = new IntWritable(i);
+      List<VectorWritable> list = writer.getValue(key);
+      assertEquals("Only one element per row", 1, list.size());
+      Vector v = list.get(0).get();
+      assertTrue("Unit vector sum is 1 or differs by 0.0001", 
Math.abs(v.norm(2) - 1) < 0.000001);
+    }
+  } 
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/test/java/org/apache/mahout/clustering/spectral/TestVectorCache.java
----------------------------------------------------------------------
diff --git 
a/mr/src/test/java/org/apache/mahout/clustering/spectral/TestVectorCache.java 
b/mr/src/test/java/org/apache/mahout/clustering/spectral/TestVectorCache.java
new file mode 100644
index 0000000..9091efe
--- /dev/null
+++ 
b/mr/src/test/java/org/apache/mahout/clustering/spectral/TestVectorCache.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral;
+
+import java.net.URI;
+
+import com.google.common.io.Closeables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.MahoutTestCase;
+import 
org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterator;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.junit.Test;
+
+public class TestVectorCache extends MahoutTestCase {
+
+  private static final double [] VECTOR = { 1, 2, 3, 4 };
+  
+  @Test
+  public void testSave() throws Exception {
+    Configuration conf = getConfiguration();
+    Writable key = new IntWritable(0);
+    Vector value = new DenseVector(VECTOR);
+    Path path = getTestTempDirPath("output");
+    
+    // write the vector out
+    VectorCache.save(key, value, path, conf, true, true);
+    
+    // can we read it from here?
+    SequenceFileValueIterator<VectorWritable> iterator =
+        new SequenceFileValueIterator<VectorWritable>(path, true, conf);
+    try {
+      VectorWritable old = iterator.next();
+      // test if the values are identical
+      assertEquals("Saved vector is identical to original", old.get(), value);
+    } finally {
+      Closeables.close(iterator, true);
+    }
+  }
+  
+  @Test
+  public void testLoad() throws Exception {
+    // save a vector manually
+    Configuration conf = getConfiguration();
+    Writable key = new IntWritable(0);
+    Vector value = new DenseVector(VECTOR);
+    Path path = getTestTempDirPath("output");
+
+    FileSystem fs = FileSystem.get(path.toUri(), conf);
+    // write the vector
+    path = fs.makeQualified(path);
+    fs.deleteOnExit(path);
+    HadoopUtil.delete(conf, path);
+    SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path, 
IntWritable.class, VectorWritable.class);
+    try {
+      writer.append(key, new VectorWritable(value));
+    } finally {
+      Closeables.close(writer, false);
+    }
+    DistributedCache.setCacheFiles(new URI[] {path.toUri()}, conf);
+
+    // load it
+    Vector result = VectorCache.load(conf);
+    
+    // are they the same?
+    assertNotNull("Vector is null", result);
+    assertEquals("Loaded vector is not identical to original", result, value);
+  }
+  
+  @Test
+  public void testAll() throws Exception {
+    Configuration conf = getConfiguration();
+    Vector v = new DenseVector(VECTOR);
+    Path toSave = getTestTempDirPath("output");
+    Writable key = new IntWritable(0);
+    
+    // save it
+    VectorCache.save(key, v, toSave, conf);
+    
+    // now, load it back
+    Vector v2 = VectorCache.load(conf);
+    
+    // are they the same?
+    assertNotNull("Vector is null", v2);
+    assertEquals("Vectors are not identical", v2, v);
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/test/java/org/apache/mahout/clustering/spectral/TestVectorMatrixMultiplicationJob.java
----------------------------------------------------------------------
diff --git 
a/mr/src/test/java/org/apache/mahout/clustering/spectral/TestVectorMatrixMultiplicationJob.java
 
b/mr/src/test/java/org/apache/mahout/clustering/spectral/TestVectorMatrixMultiplicationJob.java
new file mode 100644
index 0000000..2fd83e2
--- /dev/null
+++ 
b/mr/src/test/java/org/apache/mahout/clustering/spectral/TestVectorMatrixMultiplicationJob.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import 
org.apache.mahout.clustering.spectral.VectorMatrixMultiplicationJob.VectorMatrixMultiplicationMapper;
+import org.apache.mahout.common.DummyRecordWriter;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.junit.Test;
+
+/**
+ * <p>This test ensures that a Vector can be successfully multiplied
+ * with a matrix.</p>
+ */
+public class TestVectorMatrixMultiplicationJob extends MahoutTestCase {
+  
+  private static final double [][] MATRIX = { {1, 1}, {2, 3} };
+  private static final double [] VECTOR = {9, 16};
+  
+  @Test
+  public void testVectorMatrixMultiplicationMapper() throws Exception {
+    VectorMatrixMultiplicationMapper mapper = new 
VectorMatrixMultiplicationMapper();
+    Configuration conf = getConfiguration();
+    
+    // set up all the parameters for the job
+    Vector toSave = new DenseVector(VECTOR);
+    DummyRecordWriter<IntWritable, VectorWritable> writer = new 
+      DummyRecordWriter<IntWritable, VectorWritable>();
+    Mapper<IntWritable, VectorWritable, IntWritable, VectorWritable>.Context
+      context = DummyRecordWriter.build(mapper, conf, writer);
+    mapper.setup(toSave);
+    
+    // run the job
+    for (int i = 0; i < MATRIX.length; i++) {
+      Vector v = new RandomAccessSparseVector(MATRIX[i].length);
+      v.assign(MATRIX[i]);
+      mapper.map(new IntWritable(i), new VectorWritable(v), context);
+    }
+    
+    // check the results
+    assertEquals("Number of map results", MATRIX.length, 
writer.getData().size());
+    for (int i = 0; i < MATRIX.length; i++) {
+      List<VectorWritable> list = writer.getValue(new IntWritable(i));
+      assertEquals("Only one vector per key", 1, list.size());
+      Vector v = list.get(0).get();
+      for (int j = 0; j < MATRIX[i].length; j++) {
+        double total = Math.sqrt(VECTOR[i]) * Math.sqrt(VECTOR[j]) * 
MATRIX[i][j];
+        assertEquals("Product matrix elements", total, v.get(j),EPSILON);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/test/java/org/apache/mahout/clustering/spectral/kmeans/TestEigenSeedGenerator.java
----------------------------------------------------------------------
diff --git 
a/mr/src/test/java/org/apache/mahout/clustering/spectral/kmeans/TestEigenSeedGenerator.java
 
b/mr/src/test/java/org/apache/mahout/clustering/spectral/kmeans/TestEigenSeedGenerator.java
new file mode 100644
index 0000000..4075fe4
--- /dev/null
+++ 
b/mr/src/test/java/org/apache/mahout/clustering/spectral/kmeans/TestEigenSeedGenerator.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral.kmeans;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.mahout.clustering.Cluster;
+import org.apache.mahout.clustering.ClusteringTestUtils;
+import org.apache.mahout.clustering.iterator.ClusterWritable;
+import org.apache.mahout.clustering.spectral.kmeans.EigenSeedGenerator;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.distance.ManhattanDistanceMeasure;
+import 
org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterable;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public final class TestEigenSeedGenerator extends MahoutTestCase {
+
+  private
+   static final double[][] RAW = {{1, 0, 0}, {1, 0, 0}, {0, 1, 0}, {0, 1, 0},
+                                  {0, 1, 0}, {0, 0, 1}, {0, 0, 1}};
+
+  private FileSystem fs;
+
+  private static List<VectorWritable> getPoints() {
+    List<VectorWritable> points = Lists.newArrayList();
+    for (double[] fr : RAW) {
+      Vector vec = new RandomAccessSparseVector(fr.length);
+      vec.assign(fr);
+      points.add(new VectorWritable(vec));
+    }
+    return points;
+  }
+
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+    Configuration conf = getConfiguration();
+    fs = FileSystem.get(conf);
+  }
+
+  @Test
+  public void testEigenSeedGenerator() throws Exception {
+    List<VectorWritable> points = getPoints();
+    Job job = new Job();
+    Configuration conf = job.getConfiguration();
+    job.setMapOutputValueClass(VectorWritable.class);
+    Path input = getTestTempFilePath("eigen-input");
+    Path output = getTestTempDirPath("eigen-output");
+    ClusteringTestUtils.writePointsToFile(points, input, fs, conf);
+
+    EigenSeedGenerator.buildFromEigens(conf, input, output, 3, new 
ManhattanDistanceMeasure());
+
+    int clusterCount = 0;
+    Collection<Integer> set = new HashSet<Integer>();
+    Vector v[] = new Vector[3];
+    for (ClusterWritable clusterWritable :
+         new SequenceFileValueIterable<ClusterWritable>(
+             new Path(output, "part-eigenSeed"), true, conf)) {
+      Cluster cluster = clusterWritable.getValue();
+      int id = cluster.getId();
+      assertTrue(set.add(id)); // validate unique id's
+      v[id] = cluster.getCenter();
+      clusterCount++;
+    }
+    assertEquals(3, clusterCount); // validate sample count
+    // validate pair-wise orthogonality
+    assertEquals(0, v[0].dot(v[1]), 1E-10);
+    assertEquals(0, v[1].dot(v[2]), 1E-10);
+    assertEquals(0, v[0].dot(v[2]), 1E-10);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/test/java/org/apache/mahout/clustering/streaming/cluster/BallKMeansTest.java
----------------------------------------------------------------------
diff --git 
a/mr/src/test/java/org/apache/mahout/clustering/streaming/cluster/BallKMeansTest.java
 
b/mr/src/test/java/org/apache/mahout/clustering/streaming/cluster/BallKMeansTest.java
new file mode 100644
index 0000000..340ca8e
--- /dev/null
+++ 
b/mr/src/test/java/org/apache/mahout/clustering/streaming/cluster/BallKMeansTest.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.streaming.cluster;
+
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import org.apache.mahout.clustering.ClusteringUtils;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.RandomUtils;
+import org.apache.mahout.common.distance.EuclideanDistanceMeasure;
+import org.apache.mahout.common.distance.SquaredEuclideanDistanceMeasure;
+import org.apache.mahout.math.Centroid;
+import org.apache.mahout.math.ConstantVector;
+import org.apache.mahout.math.DenseMatrix;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Matrix;
+import org.apache.mahout.math.SingularValueDecomposition;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.WeightedVector;
+import org.apache.mahout.math.function.Functions;
+import org.apache.mahout.math.function.VectorFunction;
+import org.apache.mahout.math.neighborhood.BruteSearch;
+import org.apache.mahout.math.neighborhood.Searcher;
+import org.apache.mahout.math.neighborhood.UpdatableSearcher;
+import org.apache.mahout.math.random.MultiNormal;
+import org.apache.mahout.math.random.WeightedThing;
+import org.apache.mahout.math.stats.OnlineSummarizer;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.apache.mahout.clustering.ClusteringUtils.totalWeight;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class BallKMeansTest {
+  private static final int NUM_DATA_POINTS = 10000;
+  private static final int NUM_DIMENSIONS = 4;
+  private static final int NUM_ITERATIONS = 20;
+  private static final double DISTRIBUTION_RADIUS = 0.01;
+
+  @BeforeClass
+  public static void setUp() {
+    RandomUtils.useTestSeed();
+    syntheticData = DataUtils.sampleMultiNormalHypercube(NUM_DIMENSIONS, 
NUM_DATA_POINTS, DISTRIBUTION_RADIUS);
+
+  }
+
+  private static Pair<List<Centroid>, List<Centroid>> syntheticData;
+  private static final int K1 = 100;
+
+
+  @Test
+  public void testClusteringMultipleRuns() {
+    for (int i = 1; i <= 10; ++i) {
+      BallKMeans clusterer = new BallKMeans(new BruteSearch(new 
SquaredEuclideanDistanceMeasure()),
+          1 << NUM_DIMENSIONS, NUM_ITERATIONS, true, i);
+      clusterer.cluster(syntheticData.getFirst());
+      double costKMeansPlusPlus = 
ClusteringUtils.totalClusterCost(syntheticData.getFirst(), clusterer);
+
+      clusterer = new BallKMeans(new BruteSearch(new 
SquaredEuclideanDistanceMeasure()),
+          1 << NUM_DIMENSIONS, NUM_ITERATIONS, false, i);
+      clusterer.cluster(syntheticData.getFirst());
+      double costKMeansRandom = 
ClusteringUtils.totalClusterCost(syntheticData.getFirst(), clusterer);
+
+      System.out.printf("%d runs; kmeans++: %f; random: %f\n", i, 
costKMeansPlusPlus, costKMeansRandom);
+      assertTrue("kmeans++ cost should be less than random cost", 
costKMeansPlusPlus < costKMeansRandom);
+    }
+  }
+
+  @Test
+  public void testClustering() {
+    UpdatableSearcher searcher = new BruteSearch(new 
SquaredEuclideanDistanceMeasure());
+    BallKMeans clusterer = new BallKMeans(searcher, 1 << NUM_DIMENSIONS, 
NUM_ITERATIONS);
+
+    long startTime = System.currentTimeMillis();
+    Pair<List<Centroid>, List<Centroid>> data = syntheticData;
+    clusterer.cluster(data.getFirst());
+    long endTime = System.currentTimeMillis();
+
+    long hash = 0;
+    for (Centroid centroid : data.getFirst()) {
+      for (Vector.Element element : centroid.all()) {
+        hash = 31 * hash + 17 * element.index() + 
Double.toHexString(element.get()).hashCode();
+      }
+    }
+    System.out.printf("Hash = %08x\n", hash);
+
+    assertEquals("Total weight not preserved", 
totalWeight(syntheticData.getFirst()), totalWeight(clusterer), 1.0e-9);
+
+    // Verify that each corner of the cube has a centroid very nearby.
+    // This is probably FALSE for large-dimensional spaces!
+    OnlineSummarizer summarizer = new OnlineSummarizer();
+    for (Vector mean : syntheticData.getSecond()) {
+      WeightedThing<Vector> v = searcher.search(mean, 1).get(0);
+      summarizer.add(v.getWeight());
+    }
+    assertTrue(String.format("Median weight [%f] too large [>%f]", 
summarizer.getMedian(),
+        DISTRIBUTION_RADIUS), summarizer.getMedian() < DISTRIBUTION_RADIUS);
+
+    double clusterTime = (endTime - startTime) / 1000.0;
+    System.out.printf("%s\n%.2f for clustering\n%.1f us per row\n\n",
+        searcher.getClass().getName(), clusterTime,
+        clusterTime / syntheticData.getFirst().size() * 1.0e6);
+
+    // Verify that the total weight of the centroids near each corner is 
correct.
+    double[] cornerWeights = new double[1 << NUM_DIMENSIONS];
+    Searcher trueFinder = new BruteSearch(new EuclideanDistanceMeasure());
+    for (Vector trueCluster : syntheticData.getSecond()) {
+      trueFinder.add(trueCluster);
+    }
+    for (Centroid centroid : clusterer) {
+      WeightedThing<Vector> closest = trueFinder.search(centroid, 1).get(0);
+      cornerWeights[((Centroid)closest.getValue()).getIndex()] += 
centroid.getWeight();
+    }
+    int expectedNumPoints = NUM_DATA_POINTS / (1 << NUM_DIMENSIONS);
+    for (double v : cornerWeights) {
+      System.out.printf("%f ", v);
+    }
+    System.out.println();
+    for (double v : cornerWeights) {
+      assertEquals(expectedNumPoints, v, 0);
+    }
+  }
+
+  @Test
+  public void testInitialization() {
+    // Start with super clusterable data.
+    List<? extends WeightedVector> data = cubishTestData(0.01);
+
+    // Just do initialization of ball k-means. This should drop a point into 
each of the clusters.
+    BallKMeans r = new BallKMeans(new BruteSearch(new 
SquaredEuclideanDistanceMeasure()), 6, 20);
+    r.cluster(data);
+
+    // Put the centroids into a matrix.
+    Matrix x = new DenseMatrix(6, 5);
+    int row = 0;
+    for (Centroid c : r) {
+      x.viewRow(row).assign(c.viewPart(0, 5));
+      row++;
+    }
+
+    // Verify that each column looks right. Should contain zeros except for a 
single 6.
+    final Vector columnNorms = x.aggregateColumns(new VectorFunction() {
+      @Override
+      public double apply(Vector f) {
+        // Return the sum of three discrepancy measures.
+        return Math.abs(f.minValue()) + Math.abs(f.maxValue() - 6) + 
Math.abs(f.norm(1) - 6);
+      }
+    });
+    // Verify all errors are nearly zero.
+    assertEquals(0, columnNorms.norm(1) / columnNorms.size(), 0.1);
+
+    // Verify that the centroids are a permutation of the original ones.
+    SingularValueDecomposition svd = new SingularValueDecomposition(x);
+    Vector s = svd.getS().viewDiagonal().assign(Functions.div(6));
+    assertEquals(5, s.getLengthSquared(), 0.05);
+    assertEquals(5, s.norm(1), 0.05);
+  }
+
+  private static List<? extends WeightedVector> cubishTestData(double radius) {
+    List<WeightedVector> data = Lists.newArrayListWithCapacity(K1 + 5000);
+    int row = 0;
+
+    MultiNormal g = new MultiNormal(radius, new ConstantVector(0, 10));
+    for (int i = 0; i < K1; i++) {
+      data.add(new WeightedVector(g.sample(), 1, row++));
+    }
+
+    for (int i = 0; i < 5; i++) {
+      Vector m = new DenseVector(10);
+      m.set(i, 6); // This was originally i == 0 ? 6 : 6 which can't be right
+      MultiNormal gx = new MultiNormal(radius, m);
+      for (int j = 0; j < 1000; j++) {
+        data.add(new WeightedVector(gx.sample(), 1, row++));
+      }
+    }
+    return data;
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/test/java/org/apache/mahout/clustering/streaming/cluster/DataUtils.java
----------------------------------------------------------------------
diff --git 
a/mr/src/test/java/org/apache/mahout/clustering/streaming/cluster/DataUtils.java
 
b/mr/src/test/java/org/apache/mahout/clustering/streaming/cluster/DataUtils.java
new file mode 100644
index 0000000..2257541
--- /dev/null
+++ 
b/mr/src/test/java/org/apache/mahout/clustering/streaming/cluster/DataUtils.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.streaming.cluster;
+
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.math.Centroid;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.random.MultiNormal;
+
+/**
+ * A collection of miscellaneous utility functions for working with data to be 
clustered.
+ * Includes methods for generating synthetic data and estimating distance 
cutoff.
+ */
+public final class DataUtils {
+  private DataUtils() {
+  }
+
+  /**
+   * Samples numDatapoints vectors of numDimensions cardinality centered 
around the vertices of a
+   * numDimensions order hypercube. The distribution of points around these 
vertices is
+   * multinormal with a radius of distributionRadius.
+   * A hypercube of numDimensions has 2^numDimensions vertices. Keep this in 
mind when clustering
+   * the data.
+   *
+   * Note that it is almost always the case that you want to call 
RandomUtils.useTestSeed() before
+   * generating test data.  This means that you can't generate data in the 
declaration of a static
+   * variable because such initializations happen before any @BeforeClass or 
@Before setup methods
+   * are called.
+   *
+   *
+   * @param numDimensions number of dimensions of the vectors to be generated.
+   * @param numDatapoints number of data points to be generated.
+   * @param distributionRadius radius of the distribution around the hypercube 
vertices.
+   * @return a pair of lists, whose first element is the sampled points and 
whose second element
+   * is the list of hypercube vertices that are the means of each distribution.
+   */
+  public static Pair<List<Centroid>, List<Centroid>> 
sampleMultiNormalHypercube(
+      int numDimensions, int numDatapoints, double distributionRadius) {
+    int pow2N = 1 << numDimensions;
+    // Construct data samplers centered on the corners of a unit hypercube.
+    // Additionally, keep the means of the distributions that will be 
generated so we can compare
+    // these to the ideal cluster centers.
+    List<Centroid> mean = Lists.newArrayListWithCapacity(pow2N);
+    List<MultiNormal> rowSamplers = Lists.newArrayList();
+    for (int i = 0; i < pow2N; i++) {
+      Vector v = new DenseVector(numDimensions);
+      // Select each of the num
+      int pow2J = 1 << (numDimensions - 1);
+      for (int j = 0; j < numDimensions; ++j) {
+        v.set(j, 1.0 / pow2J * (i & pow2J));
+        pow2J >>= 1;
+      }
+      mean.add(new Centroid(i, v, 1));
+      rowSamplers.add(new MultiNormal(distributionRadius, v));
+    }
+
+    // Sample the requested number of data points.
+    List<Centroid> data = Lists.newArrayListWithCapacity(numDatapoints);
+    for (int i = 0; i < numDatapoints; ++i) {
+      data.add(new Centroid(i, rowSamplers.get(i % pow2N).sample(), 1));
+    }
+    return new Pair<List<Centroid>, List<Centroid>>(data, mean);
+  }
+
+  /**
+   * Calls sampleMultinormalHypercube(numDimension, numDataPoints, 0.01).
+   * @see DataUtils#sampleMultiNormalHypercube(int, int, double)
+   */
+  public static Pair<List<Centroid>, List<Centroid>> 
sampleMultiNormalHypercube(int numDimensions,
+                                                                               
 int numDatapoints) {
+    return sampleMultiNormalHypercube(numDimensions, numDatapoints, 0.01);
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/test/java/org/apache/mahout/clustering/streaming/cluster/StreamingKMeansTest.java
----------------------------------------------------------------------
diff --git 
a/mr/src/test/java/org/apache/mahout/clustering/streaming/cluster/StreamingKMeansTest.java
 
b/mr/src/test/java/org/apache/mahout/clustering/streaming/cluster/StreamingKMeansTest.java
new file mode 100644
index 0000000..cf9263c
--- /dev/null
+++ 
b/mr/src/test/java/org/apache/mahout/clustering/streaming/cluster/StreamingKMeansTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.streaming.cluster;
+
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.mahout.clustering.ClusteringUtils;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.RandomUtils;
+import org.apache.mahout.common.distance.EuclideanDistanceMeasure;
+import org.apache.mahout.common.distance.SquaredEuclideanDistanceMeasure;
+import org.apache.mahout.math.Centroid;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.neighborhood.BruteSearch;
+import org.apache.mahout.math.neighborhood.FastProjectionSearch;
+import org.apache.mahout.math.neighborhood.ProjectionSearch;
+import org.apache.mahout.math.neighborhood.Searcher;
+import org.apache.mahout.math.neighborhood.UpdatableSearcher;
+import org.apache.mahout.math.random.WeightedThing;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.runners.Parameterized.Parameters;
+
+
+@RunWith(Parameterized.class)
+public class StreamingKMeansTest {
+  private static final int NUM_DATA_POINTS = 1 << 16;
+  private static final int NUM_DIMENSIONS = 6;
+  private static final int NUM_PROJECTIONS = 2;
+  private static final int SEARCH_SIZE = 10;
+
+  private static Pair<List<Centroid>, List<Centroid>> syntheticData ;
+
+  @Before
+  public void setUp() {
+    RandomUtils.useTestSeed();
+    syntheticData =
+      DataUtils.sampleMultiNormalHypercube(NUM_DIMENSIONS, NUM_DATA_POINTS);
+  }
+
+  private UpdatableSearcher searcher;
+  private boolean allAtOnce;
+
+  public StreamingKMeansTest(UpdatableSearcher searcher, boolean allAtOnce) {
+    this.searcher = searcher;
+    this.allAtOnce = allAtOnce;
+  }
+
+  @Parameters
+  public static List<Object[]> generateData() {
+    return Arrays.asList(new Object[][] {
+        {new ProjectionSearch(new SquaredEuclideanDistanceMeasure(), 
NUM_PROJECTIONS, SEARCH_SIZE), true},
+        {new FastProjectionSearch(new SquaredEuclideanDistanceMeasure(), 
NUM_PROJECTIONS, SEARCH_SIZE),
+            true},
+        {new ProjectionSearch(new SquaredEuclideanDistanceMeasure(), 
NUM_PROJECTIONS, SEARCH_SIZE), false},
+        {new FastProjectionSearch(new SquaredEuclideanDistanceMeasure(), 
NUM_PROJECTIONS, SEARCH_SIZE),
+            false},
+    });
+  }
+
+  @Test
+  public void testAverageDistanceCutoff() {
+    double avgDistanceCutoff = 0;
+    double avgNumClusters = 0;
+    int numTests = 1;
+    System.out.printf("Distance cutoff for %s\n", 
searcher.getClass().getName());
+    for (int i = 0; i < numTests; ++i) {
+      searcher.clear();
+      int numStreamingClusters = 
(int)Math.log(syntheticData.getFirst().size()) * (1 <<
+          NUM_DIMENSIONS);
+      double distanceCutoff = 1.0e-6;
+      double estimatedCutoff = 
ClusteringUtils.estimateDistanceCutoff(syntheticData.getFirst(),
+          searcher.getDistanceMeasure(), 100);
+      System.out.printf("[%d] Generated synthetic data [magic] %f [estimate] 
%f\n", i, distanceCutoff, estimatedCutoff);
+      StreamingKMeans clusterer =
+          new StreamingKMeans(searcher, numStreamingClusters, estimatedCutoff);
+      clusterer.cluster(syntheticData.getFirst());
+      avgDistanceCutoff += clusterer.getDistanceCutoff();
+      avgNumClusters += clusterer.getNumClusters();
+      System.out.printf("[%d] %f\n", i, clusterer.getDistanceCutoff());
+    }
+    avgDistanceCutoff /= numTests;
+    avgNumClusters /= numTests;
+    System.out.printf("Final: distanceCutoff: %f estNumClusters: %f\n", 
avgDistanceCutoff, avgNumClusters);
+  }
+
+  @Test
+  public void testClustering() {
+    searcher.clear();
+    int numStreamingClusters = (int)Math.log(syntheticData.getFirst().size()) 
* (1 << NUM_DIMENSIONS);
+    System.out.printf("k log n = %d\n", numStreamingClusters);
+    double estimatedCutoff = 
ClusteringUtils.estimateDistanceCutoff(syntheticData.getFirst(),
+        searcher.getDistanceMeasure(), 100);
+    StreamingKMeans clusterer =
+        new StreamingKMeans(searcher, numStreamingClusters, estimatedCutoff);
+
+    long startTime = System.currentTimeMillis();
+    if (allAtOnce) {
+      clusterer.cluster(syntheticData.getFirst());
+    } else {
+      for (Centroid datapoint : syntheticData.getFirst()) {
+        clusterer.cluster(datapoint);
+      }
+    }
+    long endTime = System.currentTimeMillis();
+
+    System.out.printf("%s %s\n", searcher.getClass().getName(), 
searcher.getDistanceMeasure()
+        .getClass().getName());
+    System.out.printf("Total number of clusters %d\n", 
clusterer.getNumClusters());
+
+    System.out.printf("Weights: %f %f\n", 
ClusteringUtils.totalWeight(syntheticData.getFirst()),
+        ClusteringUtils.totalWeight(clusterer));
+    assertEquals("Total weight not preserved", 
ClusteringUtils.totalWeight(syntheticData.getFirst()),
+        ClusteringUtils.totalWeight(clusterer), 1.0e-9);
+
+    // and verify that each corner of the cube has a centroid very nearby
+    double maxWeight = 0;
+    for (Vector mean : syntheticData.getSecond()) {
+      WeightedThing<Vector> v = searcher.search(mean, 1).get(0);
+      maxWeight = Math.max(v.getWeight(), maxWeight);
+    }
+    assertTrue("Maximum weight too large " + maxWeight, maxWeight < 0.05);
+    double clusterTime = (endTime - startTime) / 1000.0;
+    System.out.printf("%s\n%.2f for clustering\n%.1f us per row\n\n",
+        searcher.getClass().getName(), clusterTime,
+        clusterTime / syntheticData.getFirst().size() * 1.0e6);
+
+    // verify that the total weight of the centroids near each corner is 
correct
+    double[] cornerWeights = new double[1 << NUM_DIMENSIONS];
+    Searcher trueFinder = new BruteSearch(new EuclideanDistanceMeasure());
+    for (Vector trueCluster : syntheticData.getSecond()) {
+      trueFinder.add(trueCluster);
+    }
+    for (Centroid centroid : clusterer) {
+      WeightedThing<Vector> closest = trueFinder.search(centroid, 1).get(0);
+      cornerWeights[((Centroid)closest.getValue()).getIndex()] += 
centroid.getWeight();
+    }
+    int expectedNumPoints = NUM_DATA_POINTS / (1 << NUM_DIMENSIONS);
+    for (double v : cornerWeights) {
+      System.out.printf("%f ", v);
+    }
+    System.out.println();
+    for (double v : cornerWeights) {
+      assertEquals(expectedNumPoints, v, 0);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/test/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansTestMR.java
----------------------------------------------------------------------
diff --git 
a/mr/src/test/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansTestMR.java
 
b/mr/src/test/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansTestMR.java
new file mode 100644
index 0000000..9b582b4
--- /dev/null
+++ 
b/mr/src/test/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansTestMR.java
@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.streaming.mapreduce;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mrunit.mapreduce.MapDriver;
+import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
+import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
+import org.apache.mahout.clustering.ClusteringUtils;
+import org.apache.mahout.clustering.streaming.cluster.DataUtils;
+import org.apache.mahout.clustering.streaming.cluster.StreamingKMeans;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.RandomUtils;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.distance.SquaredEuclideanDistanceMeasure;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable;
+import org.apache.mahout.math.Centroid;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.neighborhood.BruteSearch;
+import org.apache.mahout.math.neighborhood.FastProjectionSearch;
+import org.apache.mahout.math.neighborhood.LocalitySensitiveHashSearch;
+import org.apache.mahout.math.neighborhood.ProjectionSearch;
+import org.apache.mahout.math.random.WeightedThing;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class StreamingKMeansTestMR extends MahoutTestCase {
+  private static final int NUM_DATA_POINTS = 1 << 15;
+  private static final int NUM_DIMENSIONS = 8;
+  private static final int NUM_PROJECTIONS = 3;
+  private static final int SEARCH_SIZE = 5;
+  private static final int MAX_NUM_ITERATIONS = 10;
+  private static final double DISTANCE_CUTOFF = 1.0e-6;
+
+  private static Pair<List<Centroid>, List<Centroid>> syntheticData;
+
+  @Before
+  public void setUp() {
+    RandomUtils.useTestSeed();
+    syntheticData =
+      DataUtils.sampleMultiNormalHypercube(NUM_DIMENSIONS, NUM_DATA_POINTS, 
1.0e-4);
+  }
+
+  private final String searcherClassName;
+  private final String distanceMeasureClassName;
+
+  public StreamingKMeansTestMR(String searcherClassName, String 
distanceMeasureClassName) {
+    this.searcherClassName = searcherClassName;
+    this.distanceMeasureClassName = distanceMeasureClassName;
+  }
+
+  private void configure(Configuration configuration) {
+    configuration.set(DefaultOptionCreator.DISTANCE_MEASURE_OPTION, 
distanceMeasureClassName);
+    configuration.setInt(StreamingKMeansDriver.SEARCH_SIZE_OPTION, 
SEARCH_SIZE);
+    configuration.setInt(StreamingKMeansDriver.NUM_PROJECTIONS_OPTION, 
NUM_PROJECTIONS);
+    configuration.set(StreamingKMeansDriver.SEARCHER_CLASS_OPTION, 
searcherClassName);
+    configuration.setInt(DefaultOptionCreator.NUM_CLUSTERS_OPTION, 1 << 
NUM_DIMENSIONS);
+    configuration.setInt(StreamingKMeansDriver.ESTIMATED_NUM_MAP_CLUSTERS,
+        (1 << NUM_DIMENSIONS) * (int)Math.log(NUM_DATA_POINTS));
+    configuration.setFloat(StreamingKMeansDriver.ESTIMATED_DISTANCE_CUTOFF, 
(float) DISTANCE_CUTOFF);
+    configuration.setInt(StreamingKMeansDriver.MAX_NUM_ITERATIONS, 
MAX_NUM_ITERATIONS);
+
+    // Collapse the Centroids in the reducer.
+    configuration.setBoolean(StreamingKMeansDriver.REDUCE_STREAMING_KMEANS, 
true);
+  }
+
+  @Parameterized.Parameters
+  public static List<Object[]> generateData() {
+    return Arrays.asList(new Object[][]{
+        {ProjectionSearch.class.getName(), 
SquaredEuclideanDistanceMeasure.class.getName()},
+        {FastProjectionSearch.class.getName(), 
SquaredEuclideanDistanceMeasure.class.getName()},
+        {LocalitySensitiveHashSearch.class.getName(), 
SquaredEuclideanDistanceMeasure.class.getName()},
+    });
+  }
+
+  @Test
+  public void testHypercubeMapper() throws IOException {
+    MapDriver<Writable, VectorWritable, IntWritable, CentroidWritable> 
mapDriver =
+        MapDriver.newMapDriver(new StreamingKMeansMapper());
+    configure(mapDriver.getConfiguration());
+    System.out.printf("%s mapper test\n",
+        
mapDriver.getConfiguration().get(StreamingKMeansDriver.SEARCHER_CLASS_OPTION));
+    for (Centroid datapoint : syntheticData.getFirst()) {
+      mapDriver.addInput(new IntWritable(0), new VectorWritable(datapoint));
+    }
+    List<org.apache.hadoop.mrunit.types.Pair<IntWritable,CentroidWritable>> 
results = mapDriver.run();
+    BruteSearch resultSearcher = new BruteSearch(new 
SquaredEuclideanDistanceMeasure());
+    for (org.apache.hadoop.mrunit.types.Pair<IntWritable, CentroidWritable> 
result : results) {
+      resultSearcher.add(result.getSecond().getCentroid());
+    }
+    System.out.printf("Clustered the data into %d clusters\n", results.size());
+    for (Vector mean : syntheticData.getSecond()) {
+      WeightedThing<Vector> closest = resultSearcher.search(mean, 1).get(0);
+      assertTrue("Weight " + closest.getWeight() + " not less than 0.5", 
closest.getWeight() < 0.5);
+    }
+  }
+
+  @Test
+  public void testMapperVsLocal() throws IOException {
+    // Clusters the data using the StreamingKMeansMapper.
+    MapDriver<Writable, VectorWritable, IntWritable, CentroidWritable> 
mapDriver =
+        MapDriver.newMapDriver(new StreamingKMeansMapper());
+    Configuration configuration = mapDriver.getConfiguration();
+    configure(configuration);
+    System.out.printf("%s mapper vs local test\n",
+        
mapDriver.getConfiguration().get(StreamingKMeansDriver.SEARCHER_CLASS_OPTION));
+
+    for (Centroid datapoint : syntheticData.getFirst()) {
+      mapDriver.addInput(new IntWritable(0), new VectorWritable(datapoint));
+    }
+    List<Centroid> mapperCentroids = Lists.newArrayList();
+    for (org.apache.hadoop.mrunit.types.Pair<IntWritable, CentroidWritable> 
pair : mapDriver.run()) {
+      mapperCentroids.add(pair.getSecond().getCentroid());
+    }
+
+    // Clusters the data using local batch StreamingKMeans.
+    StreamingKMeans batchClusterer =
+        new 
StreamingKMeans(StreamingKMeansUtilsMR.searcherFromConfiguration(configuration),
+            mapDriver.getConfiguration().getInt("estimatedNumMapClusters", 
-1), DISTANCE_CUTOFF);
+    batchClusterer.cluster(syntheticData.getFirst());
+    List<Centroid> batchCentroids = Lists.newArrayList();
+    for (Vector v : batchClusterer) {
+      batchCentroids.add((Centroid) v);
+    }
+
+    // Clusters the data using point by point StreamingKMeans.
+    StreamingKMeans perPointClusterer =
+        new 
StreamingKMeans(StreamingKMeansUtilsMR.searcherFromConfiguration(configuration),
+            (1 << NUM_DIMENSIONS) * (int)Math.log(NUM_DATA_POINTS), 
DISTANCE_CUTOFF);
+    for (Centroid datapoint : syntheticData.getFirst()) {
+      perPointClusterer.cluster(datapoint);
+    }
+    List<Centroid> perPointCentroids = Lists.newArrayList();
+    for (Vector v : perPointClusterer) {
+      perPointCentroids.add((Centroid) v);
+    }
+
+    // Computes the cost (total sum of distances) of these different 
clusterings.
+    double mapperCost = 
ClusteringUtils.totalClusterCost(syntheticData.getFirst(), mapperCentroids);
+    double localCost = 
ClusteringUtils.totalClusterCost(syntheticData.getFirst(), batchCentroids);
+    double perPointCost = 
ClusteringUtils.totalClusterCost(syntheticData.getFirst(), perPointCentroids);
+    System.out.printf("[Total cost] Mapper %f [%d] Local %f [%d] Perpoint 
local %f [%d];" +
+        "[ratio m-vs-l %f] [ratio pp-vs-l %f]\n", mapperCost, 
mapperCentroids.size(),
+        localCost, batchCentroids.size(), perPointCost, 
perPointCentroids.size(),
+        mapperCost / localCost, perPointCost / localCost);
+
+    // These ratios should be close to 1.0 and have been observed to be go as 
low as 0.6 and as low as 1.5.
+    // A buffer of [0.2, 1.8] seems appropriate.
+    assertEquals("Mapper StreamingKMeans / Batch local StreamingKMeans total 
cost ratio too far from 1",
+        1.0, mapperCost / localCost, 0.8);
+    assertEquals("One by one local StreamingKMeans / Batch local 
StreamingKMeans total cost ratio too high",
+        1.0, perPointCost / localCost, 0.8);
+  }
+
+  @Test
+  public void testHypercubeReducer() throws IOException {
+    ReduceDriver<IntWritable, CentroidWritable, IntWritable, CentroidWritable> 
reduceDriver =
+        ReduceDriver.newReduceDriver(new StreamingKMeansReducer());
+    Configuration configuration = reduceDriver.getConfiguration();
+    configure(configuration);
+
+    System.out.printf("%s reducer test\n", 
configuration.get(StreamingKMeansDriver.SEARCHER_CLASS_OPTION));
+    StreamingKMeans clusterer =
+        new StreamingKMeans(StreamingKMeansUtilsMR 
.searcherFromConfiguration(configuration),
+            (1 << NUM_DIMENSIONS) * (int)Math.log(NUM_DATA_POINTS), 
DISTANCE_CUTOFF);
+
+    long start = System.currentTimeMillis();
+    clusterer.cluster(syntheticData.getFirst());
+    long end = System.currentTimeMillis();
+
+    System.out.printf("%f [s]\n", (end - start) / 1000.0);
+    List<CentroidWritable> reducerInputs = Lists.newArrayList();
+    int postMapperTotalWeight = 0;
+    for (Centroid intermediateCentroid : clusterer) {
+      reducerInputs.add(new CentroidWritable(intermediateCentroid));
+      postMapperTotalWeight += intermediateCentroid.getWeight();
+    }
+
+    reduceDriver.addInput(new IntWritable(0), reducerInputs);
+    List<org.apache.hadoop.mrunit.types.Pair<IntWritable, CentroidWritable>> 
results =
+        reduceDriver.run();
+    testReducerResults(postMapperTotalWeight, results);
+  }
+
+  @Test
+  public void testHypercubeMapReduce() throws IOException {
+    MapReduceDriver<Writable, VectorWritable, IntWritable, CentroidWritable, 
IntWritable, CentroidWritable>
+        mapReduceDriver = new MapReduceDriver<Writable, VectorWritable, 
IntWritable, CentroidWritable,
+        IntWritable, CentroidWritable>(new StreamingKMeansMapper(), new 
StreamingKMeansReducer());
+    Configuration configuration = mapReduceDriver.getConfiguration();
+    configure(configuration);
+
+    System.out.printf("%s full test\n", 
configuration.get(StreamingKMeansDriver.SEARCHER_CLASS_OPTION));
+    for (Centroid datapoint : syntheticData.getFirst()) {
+      mapReduceDriver.addInput(new IntWritable(0), new 
VectorWritable(datapoint));
+    }
+    List<org.apache.hadoop.mrunit.types.Pair<IntWritable, CentroidWritable>> 
results = mapReduceDriver.run();
+    testReducerResults(syntheticData.getFirst().size(), results);
+  }
+
+  @Test
+  public void testHypercubeMapReduceRunSequentially() throws Exception {
+    Configuration configuration = getConfiguration();
+    configure(configuration);
+    configuration.set(DefaultOptionCreator.METHOD_OPTION, 
DefaultOptionCreator.SEQUENTIAL_METHOD);
+
+    Path inputPath = new Path("testInput");
+    Path outputPath = new Path("testOutput");
+    
StreamingKMeansUtilsMR.writeVectorsToSequenceFile(syntheticData.getFirst(), 
inputPath, configuration);
+
+    StreamingKMeansDriver.run(configuration, inputPath, outputPath);
+
+    testReducerResults(syntheticData.getFirst().size(),
+        Lists.newArrayList(Iterables.transform(
+            new SequenceFileIterable<IntWritable, 
CentroidWritable>(outputPath, configuration),
+            new Function<
+                Pair<IntWritable, CentroidWritable>,
+                org.apache.hadoop.mrunit.types.Pair<IntWritable, 
CentroidWritable>>() {
+              @Override
+              public org.apache.hadoop.mrunit.types.Pair<IntWritable, 
CentroidWritable> apply(
+                  org.apache.mahout.common.Pair<IntWritable, CentroidWritable> 
input) {
+                return new org.apache.hadoop.mrunit.types.Pair<IntWritable, 
CentroidWritable>(
+                    input.getFirst(), input.getSecond());
+              }
+            })));
+  }
+
+  private static void testReducerResults(int totalWeight, 
List<org.apache.hadoop.mrunit.types.Pair<IntWritable,
+      CentroidWritable>> results) {
+    int expectedNumClusters = 1 << NUM_DIMENSIONS;
+    double expectedWeight = (double) totalWeight / expectedNumClusters;
+    int numClusters = 0;
+    int numUnbalancedClusters = 0;
+    int totalReducerWeight = 0;
+    for (org.apache.hadoop.mrunit.types.Pair<IntWritable, CentroidWritable> 
result : results) {
+      if (result.getSecond().getCentroid().getWeight() != expectedWeight) {
+        System.out.printf("Unbalanced weight %f in centroid %d\n",  
result.getSecond().getCentroid().getWeight(),
+            result.getSecond().getCentroid().getIndex());
+        ++numUnbalancedClusters;
+      }
+      assertEquals("Final centroid index is invalid", numClusters, 
result.getFirst().get());
+      totalReducerWeight += result.getSecond().getCentroid().getWeight();
+      ++numClusters;
+    }
+    System.out.printf("%d clusters are unbalanced\n", numUnbalancedClusters);
+    assertEquals("Invalid total weight", totalWeight, totalReducerWeight);
+    assertEquals("Invalid number of clusters", 1 << NUM_DIMENSIONS, 
numClusters);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/test/java/org/apache/mahout/clustering/streaming/tools/ResplitSequenceFilesTest.java
----------------------------------------------------------------------
diff --git 
a/mr/src/test/java/org/apache/mahout/clustering/streaming/tools/ResplitSequenceFilesTest.java
 
b/mr/src/test/java/org/apache/mahout/clustering/streaming/tools/ResplitSequenceFilesTest.java
new file mode 100644
index 0000000..2d790e5
--- /dev/null
+++ 
b/mr/src/test/java/org/apache/mahout/clustering/streaming/tools/ResplitSequenceFilesTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.streaming.tools;
+
+import com.google.common.collect.Iterables;
+import com.google.common.io.Closeables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable;
+import org.junit.Test;
+
+public class ResplitSequenceFilesTest extends MahoutTestCase {
+
+  @Test
+  public void testSplitting() throws Exception {
+
+    Path inputFile = new Path(getTestTempDirPath("input"), "test.seq");
+    Path output = getTestTempDirPath("output");
+    Configuration conf = new Configuration();
+    LocalFileSystem fs = FileSystem.getLocal(conf);
+
+    SequenceFile.Writer writer = null;
+    try {
+      writer = SequenceFile.createWriter(fs, conf, inputFile, 
IntWritable.class, IntWritable.class);
+      writer.append(new IntWritable(1), new IntWritable(1));
+      writer.append(new IntWritable(2), new IntWritable(2));
+      writer.append(new IntWritable(3), new IntWritable(3));
+      writer.append(new IntWritable(4), new IntWritable(4));
+      writer.append(new IntWritable(5), new IntWritable(5));
+      writer.append(new IntWritable(6), new IntWritable(6));
+      writer.append(new IntWritable(7), new IntWritable(7));
+      writer.append(new IntWritable(8), new IntWritable(8));
+    } finally {
+      Closeables.close(writer, false);
+    }
+
+    String splitPattern = "split";
+    int numSplits = 4;
+
+    ResplitSequenceFiles.main(new String[] { "--input", inputFile.toString(),
+        "--output", output.toString() + "/" + splitPattern, "--numSplits", 
String.valueOf(numSplits) });
+
+    FileStatus[] statuses = HadoopUtil.getFileStatus(output, PathType.LIST, 
PathFilters.logsCRCFilter(), null, conf);
+
+    for (FileStatus status : statuses) {
+      String name = status.getPath().getName();
+      assertTrue(name.startsWith(splitPattern));
+      assertEquals(2, numEntries(status, conf));
+    }
+    assertEquals(numSplits, statuses.length);
+  }
+
+  private int numEntries(FileStatus status, Configuration conf) {
+    return Iterables.size(new SequenceFileIterable(status.getPath(), conf));
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/test/java/org/apache/mahout/clustering/topdown/PathDirectoryTest.java
----------------------------------------------------------------------
diff --git 
a/mr/src/test/java/org/apache/mahout/clustering/topdown/PathDirectoryTest.java 
b/mr/src/test/java/org/apache/mahout/clustering/topdown/PathDirectoryTest.java
new file mode 100644
index 0000000..66b66e3
--- /dev/null
+++ 
b/mr/src/test/java/org/apache/mahout/clustering/topdown/PathDirectoryTest.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.mahout.clustering.topdown;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.mahout.common.MahoutTestCase;
+import org.junit.Test;
+
+import java.io.File;
+
+public final class PathDirectoryTest extends MahoutTestCase {
+  
+  private final Path output = new Path("output");
+  
+  @Test
+  public void shouldReturnTopLevelClusterPath() {
+    Path expectedPath = new Path(output, 
PathDirectory.TOP_LEVEL_CLUSTER_DIRECTORY);
+    assertEquals(expectedPath, PathDirectory.getTopLevelClusterPath(output));
+  }
+  
+  @Test
+  public void shouldReturnClusterPostProcessorOutputDirectory() {
+    Path expectedPath = new Path(output, PathDirectory.POST_PROCESS_DIRECTORY);
+    assertEquals(expectedPath, 
PathDirectory.getClusterPostProcessorOutputDirectory(output));
+  }
+  
+  @Test
+  public void shouldReturnClusterOutputClusteredPoints() {
+    Path expectedPath = new Path(output, 
PathDirectory.CLUSTERED_POINTS_DIRECTORY + File.separator + '*');
+    assertEquals(expectedPath, 
PathDirectory.getClusterOutputClusteredPoints(output));
+  }
+  
+  @Test
+  public void shouldReturnBottomLevelClusterPath() {
+    Path expectedPath = new Path(output + File.separator
+                                 + 
PathDirectory.BOTTOM_LEVEL_CLUSTER_DIRECTORY + File.separator
+                                 + '1');
+    assertEquals(expectedPath, PathDirectory.getBottomLevelClusterPath(output, 
"1"));
+  }
+  
+  @Test
+  public void shouldReturnClusterPathForClusterId() {
+    Path expectedPath = new 
Path(PathDirectory.getClusterPostProcessorOutputDirectory(output), new 
Path("1"));
+    assertEquals(expectedPath, PathDirectory.getClusterPathForClusterId(
+        PathDirectory.getClusterPostProcessorOutputDirectory(output), "1"));
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/test/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterCountReaderTest.java
----------------------------------------------------------------------
diff --git 
a/mr/src/test/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterCountReaderTest.java
 
b/mr/src/test/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterCountReaderTest.java
new file mode 100644
index 0000000..0934ff7
--- /dev/null
+++ 
b/mr/src/test/java/org/apache/mahout/clustering/topdown/postprocessor/ClusterCountReaderTest.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.topdown.postprocessor;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.mahout.clustering.Cluster;
+import org.apache.mahout.clustering.ClusteringTestUtils;
+import org.apache.mahout.clustering.canopy.CanopyDriver;
+import org.apache.mahout.clustering.classify.WeightedVectorWritable;
+import org.apache.mahout.clustering.kmeans.KMeansDriver;
+import org.apache.mahout.common.DummyOutputCollector;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.common.distance.ManhattanDistanceMeasure;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public final class ClusterCountReaderTest extends MahoutTestCase {
+  
+  public static final double[][] REFERENCE = { {1, 1}, {2, 1}, {1, 2}, {4, 4}, 
{5, 4}, {4, 5}, {5, 5}};
+  
+  private FileSystem fs;
+  private Path outputPathForCanopy;
+  private Path outputPathForKMeans;
+  
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+    Configuration conf = getConfiguration();
+    fs = FileSystem.get(conf);
+  }
+  
+  public static List<VectorWritable> getPointsWritable(double[][] raw) {
+    List<VectorWritable> points = Lists.newArrayList();
+    for (double[] fr : raw) {
+      Vector vec = new RandomAccessSparseVector(fr.length);
+      vec.assign(fr);
+      points.add(new VectorWritable(vec));
+    }
+    return points;
+  }
+  
+  /**
+   * Story: User wants to use cluster post processor after canopy clustering 
and then run clustering on the
+   * output clusters
+   */
+  @Test
+  public void testGetNumberOfClusters() throws Exception {
+    List<VectorWritable> points = getPointsWritable(REFERENCE);
+    
+    Path pointsPath = getTestTempDirPath("points");
+    Configuration conf = getConfiguration();
+    ClusteringTestUtils.writePointsToFile(points, new Path(pointsPath, 
"file1"), fs, conf);
+    ClusteringTestUtils.writePointsToFile(points, new Path(pointsPath, 
"file2"), fs, conf);
+    
+    outputPathForCanopy = getTestTempDirPath("canopy");
+    outputPathForKMeans = getTestTempDirPath("kmeans");
+    
+    topLevelClustering(pointsPath, conf);
+    
+    int numberOfClusters = 
ClusterCountReader.getNumberOfClusters(outputPathForKMeans, conf);
+    Assert.assertEquals(2, numberOfClusters);
+    verifyThatNumberOfClustersIsCorrect(conf, new Path(outputPathForKMeans, 
new Path("clusteredPoints")));
+    
+  }
+  
+  private void topLevelClustering(Path pointsPath, Configuration conf) throws 
IOException,
+                                                                      
InterruptedException,
+                                                                      
ClassNotFoundException {
+    DistanceMeasure measure = new ManhattanDistanceMeasure();
+    CanopyDriver.run(conf, pointsPath, outputPathForCanopy, measure, 4.0, 3.0, 
true, 0.0, true);
+    Path clustersIn = new Path(outputPathForCanopy, new 
Path(Cluster.CLUSTERS_DIR + '0'
+                                                                   + 
Cluster.FINAL_ITERATION_SUFFIX));
+    KMeansDriver.run(conf, pointsPath, clustersIn, outputPathForKMeans, 1, 1, 
true, 0.0, true);
+  }
+  
+  private static void verifyThatNumberOfClustersIsCorrect(Configuration conf, 
Path clusteredPointsPath) {
+    DummyOutputCollector<IntWritable,WeightedVectorWritable> collector =
+        new DummyOutputCollector<IntWritable,WeightedVectorWritable>();
+    
+    // The key is the clusterId, the value is the weighted vector
+    for (Pair<IntWritable,WeightedVectorWritable> record :
+         new SequenceFileIterable<IntWritable,WeightedVectorWritable>(new 
Path(clusteredPointsPath, "part-m-0"),
+                                                                      conf)) {
+      collector.collect(record.getFirst(), record.getSecond());
+    }
+    int clusterSize = collector.getKeys().size();
+    assertEquals(2, clusterSize);
+  }
+  
+}

Reply via email to