http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/lucene/LuceneIterator.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/lucene/LuceneIterator.java
 
b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/lucene/LuceneIterator.java
new file mode 100644
index 0000000..6a8c659
--- /dev/null
+++ 
b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/lucene/LuceneIterator.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.utils.vectors.lucene;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.TreeSet;
+
+import com.google.common.base.Preconditions;
+import org.apache.lucene.index.IndexReader;
+import org.apache.mahout.utils.vectors.TermInfo;
+import org.apache.mahout.vectorizer.Weight;
+
+/**
+ * An {@link java.util.Iterator} over {@link org.apache.mahout.math.Vector}s 
that uses a Lucene index as the source
+ * for creating the {@link org.apache.mahout.math.Vector}s. The field used to 
create the vectors currently must have
+ * term vectors stored for it.
+ */
+public class LuceneIterator extends AbstractLuceneIterator {
+
+  protected final Set<String> idFieldSelector;
+  protected final String idField;
+
+    /**
+   * Produce a LuceneIterable that can create the Vector plus normalize it.
+   *
+   * @param indexReader {@link IndexReader} to read the documents from.
+   * @param idField     field containing the id. May be null.
+   * @param field       field to use for the Vector
+   * @param termInfo    termInfo
+   * @param weight      weight
+   * @param normPower   the normalization value. Must be non-negative, or 
{@link LuceneIterable#NO_NORMALIZING}
+   */
+  public LuceneIterator(IndexReader indexReader, String idField, String field, 
TermInfo termInfo, Weight weight,
+                        double normPower) {
+    this(indexReader, idField, field, termInfo, weight, normPower, 0.0);
+  }
+
+  /**
+   * @param indexReader {@link IndexReader} to read the documents from.
+   * @param idField    field containing the id. May be null.
+   * @param field      field to use for the Vector
+   * @param termInfo   termInfo
+   * @param weight     weight
+   * @param normPower  the normalization value. Must be non-negative, or 
{@link LuceneIterable#NO_NORMALIZING}
+   * @param maxPercentErrorDocs most documents that will be tolerated without 
a term freq vector. In [0,1].
+   * @see #LuceneIterator(org.apache.lucene.index.IndexReader, String, String, 
org.apache.mahout.utils.vectors.TermInfo,
+   * org.apache.mahout.vectorizer.Weight, double)
+   */
+  public LuceneIterator(IndexReader indexReader,
+                        String idField,
+                        String field,
+                        TermInfo termInfo,
+                        Weight weight,
+                        double normPower,
+                        double maxPercentErrorDocs) {
+      super(termInfo, normPower, indexReader, weight, maxPercentErrorDocs, 
field);
+      // term docs(null) is a better way of iterating all the docs in Lucene
+    Preconditions.checkArgument(normPower == LuceneIterable.NO_NORMALIZING || 
normPower >= 0,
+        "normPower must be non-negative or -1, but normPower = " + normPower);
+    Preconditions.checkArgument(maxPercentErrorDocs >= 0.0 && 
maxPercentErrorDocs <= 1.0,
+        "Must be: 0.0 <= maxPercentErrorDocs <= 1.0");
+    this.idField = idField;
+    if (idField != null) {
+      idFieldSelector = new TreeSet<>();
+      idFieldSelector.add(idField);
+    } else {
+      /*The field in the index  containing the index. If null, then the Lucene 
internal doc id is used
+      which is prone to error if the underlying index changes*/
+      idFieldSelector = null;
+    }
+  }
+
+  @Override
+  protected String getVectorName(int documentIndex) throws IOException {
+    String name;
+    if (idField != null) {
+      name = indexReader.document(documentIndex, idFieldSelector).get(idField);
+    } else {
+      name = String.valueOf(documentIndex);
+    }
+    return name;
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/lucene/TFDFMapper.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/lucene/TFDFMapper.java
 
b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/lucene/TFDFMapper.java
new file mode 100644
index 0000000..5830ccc
--- /dev/null
+++ 
b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/lucene/TFDFMapper.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.utils.vectors.lucene;
+
+import org.apache.lucene.util.BytesRef;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.utils.vectors.TermEntry;
+import org.apache.mahout.utils.vectors.TermInfo;
+import org.apache.mahout.vectorizer.Weight;
+
+
+/**
+ * Not thread-safe
+ */
+public class TFDFMapper  {
+
+  private Vector vector;
+  
+  private final Weight weight;
+  private long numTerms;
+  private final TermInfo termInfo;
+  private String field;
+  private final int numDocs;
+  
+  public TFDFMapper(int numDocs, Weight weight, TermInfo termInfo) {
+    this.weight = weight;
+    this.termInfo = termInfo;
+    this.numDocs = numDocs;
+  }
+
+  public void setExpectations(String field, long numTerms) {
+    this.field = field;
+    vector = new RandomAccessSparseVector(termInfo.totalTerms(field));
+    this.numTerms = numTerms;
+  }
+  
+  public void map(BytesRef term, int frequency) {
+    TermEntry entry = termInfo.getTermEntry(field, term.utf8ToString());
+    if (entry != null) {
+      vector.setQuick(entry.getTermIdx(), weight.calculate(frequency, 
entry.getDocFreq(), (int)numTerms, numDocs));
+    }
+  }
+  
+  public Vector getVector() {
+    return this.vector;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/lucene/TermInfoClusterInOut.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/lucene/TermInfoClusterInOut.java
 
b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/lucene/TermInfoClusterInOut.java
new file mode 100644
index 0000000..b0311c7
--- /dev/null
+++ 
b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/lucene/TermInfoClusterInOut.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.utils.vectors.lucene;
+
+import org.apache.mahout.common.RandomUtils;
+
+class TermInfoClusterInOut implements Comparable<TermInfoClusterInOut> {
+
+  private final String term;
+  private final int inClusterDF;
+  private final int outClusterDF;
+  private final double logLikelihoodRatio;
+
+  TermInfoClusterInOut(String term, int inClusterDF, int outClusterDF, double 
logLikelihoodRatio) {
+    this.term = term;
+    this.inClusterDF = inClusterDF;
+    this.outClusterDF = outClusterDF;
+    this.logLikelihoodRatio = logLikelihoodRatio;
+  }
+
+  @Override
+  public int hashCode() {
+    return term.hashCode() ^ inClusterDF ^ outClusterDF ^ 
RandomUtils.hashDouble(logLikelihoodRatio);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (!(o instanceof TermInfoClusterInOut)) {
+      return false;
+    }
+    TermInfoClusterInOut other = (TermInfoClusterInOut) o;
+    return term.equals(other.getTerm())
+        && inClusterDF == other.getInClusterDF()
+        && outClusterDF == other.getOutClusterDF()
+        && logLikelihoodRatio == other.getLogLikelihoodRatio();
+  }
+
+  @Override
+  public int compareTo(TermInfoClusterInOut that) {
+    int res = Double.compare(that.logLikelihoodRatio, logLikelihoodRatio);
+    if (res == 0) {
+      res = term.compareTo(that.term);
+    }
+    return res;
+  }
+
+  public int getInClusterDiff() {
+    return this.inClusterDF - this.outClusterDF;
+  }
+
+  String getTerm() {
+    return term;
+  }
+
+  int getInClusterDF() {
+    return inClusterDF;
+  }
+
+  int getOutClusterDF() {
+    return outClusterDF;
+  }
+
+  double getLogLikelihoodRatio() {
+    return logLikelihoodRatio;
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/test/java/org/apache/mahout/cf/taste/impl/similarity/jdbc/MySQLJDBCInMemoryItemSimilarityTest.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/integration/src/test/java/org/apache/mahout/cf/taste/impl/similarity/jdbc/MySQLJDBCInMemoryItemSimilarityTest.java
 
b/community/mahout-mr/integration/src/test/java/org/apache/mahout/cf/taste/impl/similarity/jdbc/MySQLJDBCInMemoryItemSimilarityTest.java
new file mode 100644
index 0000000..463a45f
--- /dev/null
+++ 
b/community/mahout-mr/integration/src/test/java/org/apache/mahout/cf/taste/impl/similarity/jdbc/MySQLJDBCInMemoryItemSimilarityTest.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.impl.similarity.jdbc;
+
+import org.apache.mahout.cf.taste.impl.TasteTestCase;
+import org.apache.mahout.cf.taste.similarity.ItemSimilarity;
+import org.easymock.EasyMock;
+import org.junit.Test;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+
+public class MySQLJDBCInMemoryItemSimilarityTest extends TasteTestCase {
+
+  @Test
+  public void testMemoryLoad() throws Exception {
+
+    DataSource dataSource = EasyMock.createMock(DataSource.class);
+    Connection connection = EasyMock.createMock(Connection.class);
+    PreparedStatement statement = EasyMock.createMock(PreparedStatement.class);
+    ResultSet resultSet = EasyMock.createMock(ResultSet.class);
+
+    EasyMock.expect(dataSource.getConnection()).andReturn(connection);
+    
EasyMock.expect(connection.prepareStatement(MySQLJDBCInMemoryItemSimilarity.DEFAULT_GET_ALL_ITEMSIMILARITIES_SQL,
+        ResultSet.TYPE_FORWARD_ONLY, 
ResultSet.CONCUR_READ_ONLY)).andReturn(statement);
+    statement.setFetchDirection(ResultSet.FETCH_FORWARD);
+    EasyMock.expect(statement.executeQuery()).andReturn(resultSet);
+
+    EasyMock.expect(resultSet.next()).andReturn(true);
+
+    EasyMock.expect(resultSet.getLong(1)).andReturn(1L);
+    EasyMock.expect(resultSet.getLong(2)).andReturn(2L);
+    EasyMock.expect(resultSet.getDouble(3)).andReturn(0.5);
+    EasyMock.expect(resultSet.next()).andReturn(true);
+
+    EasyMock.expect(resultSet.getLong(1)).andReturn(1L);
+    EasyMock.expect(resultSet.getLong(2)).andReturn(3L);
+    EasyMock.expect(resultSet.getDouble(3)).andReturn(0.4);
+    EasyMock.expect(resultSet.next()).andReturn(true);
+
+    EasyMock.expect(resultSet.getLong(1)).andReturn(3L);
+    EasyMock.expect(resultSet.getLong(2)).andReturn(4L);
+    EasyMock.expect(resultSet.getDouble(3)).andReturn(0.1);
+
+    EasyMock.expect(resultSet.next()).andReturn(false);
+
+    resultSet.close();
+    statement.close();
+    connection.close();
+
+    EasyMock.replay(dataSource, connection, statement, resultSet);
+
+    ItemSimilarity similarity = new 
MySQLJDBCInMemoryItemSimilarity(dataSource);
+
+    assertEquals(0.5, similarity.itemSimilarity(1L, 2L), EPSILON);
+    assertEquals(0.4, similarity.itemSimilarity(1L, 3L), EPSILON);
+    assertEquals(0.1, similarity.itemSimilarity(3L, 4L), EPSILON);
+    assertTrue(Double.isNaN(similarity.itemSimilarity(1L, 4L)));
+
+    EasyMock.verify(dataSource, connection, statement, resultSet);
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/test/java/org/apache/mahout/clustering/TestClusterDumper.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/integration/src/test/java/org/apache/mahout/clustering/TestClusterDumper.java
 
b/community/mahout-mr/integration/src/test/java/org/apache/mahout/clustering/TestClusterDumper.java
new file mode 100644
index 0000000..01d46fc
--- /dev/null
+++ 
b/community/mahout-mr/integration/src/test/java/org/apache/mahout/clustering/TestClusterDumper.java
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.FieldType;
+import org.apache.lucene.document.StringField;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexOptions;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.store.RAMDirectory;
+import org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver;
+import org.apache.mahout.clustering.kmeans.KMeansDriver;
+import org.apache.mahout.clustering.kmeans.RandomSeedGenerator;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.common.distance.EuclideanDistanceMeasure;
+import org.apache.mahout.math.NamedVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.utils.clustering.ClusterDumper;
+import org.apache.mahout.utils.vectors.TermEntry;
+import org.apache.mahout.utils.vectors.TermInfo;
+import org.apache.mahout.utils.vectors.lucene.CachedTermInfo;
+import org.apache.mahout.utils.vectors.lucene.LuceneIterable;
+import org.apache.mahout.vectorizer.TFIDF;
+import org.apache.mahout.vectorizer.Weight;
+import org.junit.Before;
+import org.junit.Test;
+
+public final class TestClusterDumper extends MahoutTestCase {
+
+  private static final String[] DOCS = {
+      "The quick red fox jumped over the lazy brown dogs.",
+      "The quick brown fox jumped over the lazy red dogs.",
+      "The quick red cat jumped over the lazy brown dogs.",
+      "The quick brown cat jumped over the lazy red dogs.",
+      "Mary had a little lamb whose fleece was white as snow.",
+      "Mary had a little goat whose fleece was white as snow.",
+      "Mary had a little lamb whose fleece was black as tar.",
+      "Dick had a little goat whose fleece was white as snow.",
+      "Moby Dick is a story of a whale and a man obsessed.",
+      "Moby Bob is a story of a walrus and a man obsessed.",
+      "Moby Dick is a story of a whale and a crazy man.",
+      "The robber wore a black fleece jacket and a baseball cap.",
+      "The robber wore a red fleece jacket and a baseball cap.",
+      "The robber wore a white fleece jacket and a baseball cap.",
+      "The English Springer Spaniel is the best of all dogs."};
+
+  private List<VectorWritable> sampleData;
+
+  private String[] termDictionary;
+
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+    Configuration conf = getConfiguration();
+    FileSystem fs = FileSystem.get(conf);
+    // Create test data
+    getSampleData(DOCS);
+    ClusteringTestUtils.writePointsToFile(sampleData, true,
+        getTestTempFilePath("testdata/file1"), fs, conf);
+  }
+
+  private void getSampleData(String[] docs2) throws IOException {
+    sampleData = new ArrayList<>();
+    RAMDirectory directory = new RAMDirectory();
+    try (IndexWriter writer = new IndexWriter(directory,
+        new IndexWriterConfig(new StandardAnalyzer()))){
+      for (int i = 0; i < docs2.length; i++) {
+        Document doc = new Document();
+        Field id = new StringField("id", "doc_" + i, Field.Store.YES);
+        doc.add(id);
+        // Store both position and offset information
+        FieldType fieldType = new FieldType();
+        fieldType.setStored(false);
+        
fieldType.setIndexOptions(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS);
+        fieldType.setTokenized(true);
+        fieldType.setStoreTermVectors(true);
+        fieldType.setStoreTermVectorPositions(true);
+        fieldType.setStoreTermVectorOffsets(true);
+        fieldType.freeze();
+        Field text = new Field("content", docs2[i], fieldType);
+        doc.add(text);
+        writer.addDocument(doc);
+      }
+    }
+
+    IndexReader reader = DirectoryReader.open(directory);
+
+    Weight weight = new TFIDF();
+    TermInfo termInfo = new CachedTermInfo(reader, "content", 1, 100);
+
+    int numTerms = 0;
+    for (Iterator<TermEntry> it = termInfo.getAllEntries(); it.hasNext();) {
+      it.next();
+      numTerms++;
+    }
+    termDictionary = new String[numTerms];
+    int i = 0;
+    for (Iterator<TermEntry> it = termInfo.getAllEntries(); it.hasNext();) {
+      String term = it.next().getTerm();
+      termDictionary[i] = term;
+      System.out.println(i + " " + term);
+      i++;
+    }
+    Iterable<Vector> iterable = new LuceneIterable(reader, "id", "content",
+        termInfo,weight);
+
+    i = 0;
+    for (Vector vector : iterable) {
+      assertNotNull(vector);
+      NamedVector namedVector;
+      if (vector instanceof NamedVector) {
+        // rename it for testing purposes
+        namedVector = new NamedVector(((NamedVector) vector).getDelegate(),
+            "P(" + i + ')');
+
+      } else {
+        namedVector = new NamedVector(vector, "P(" + i + ')');
+      }
+      System.out.println(AbstractCluster.formatVector(namedVector,
+          termDictionary));
+      sampleData.add(new VectorWritable(namedVector));
+      i++;
+    }
+  }
+
+  /**
+   * Return the path to the final iteration's clusters
+   */
+  private static Path finalClusterPath(Configuration conf, Path output,
+      int maxIterations) throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+    for (int i = maxIterations; i >= 0; i--) {
+      Path clusters = new Path(output, "clusters-" + i + "-final");
+      if (fs.exists(clusters)) {
+        return clusters;
+      }
+    }
+    return null;
+  }
+
+  @Test
+  public void testKmeans() throws Exception {
+    DistanceMeasure measure = new EuclideanDistanceMeasure();
+    Path input = getTestTempFilePath("input");
+    Path output = getTestTempDirPath("output");
+    Path initialPoints = new Path(output, Cluster.CLUSTERS_DIR + '0' + 
Cluster.FINAL_ITERATION_SUFFIX);
+    Configuration conf = getConfiguration();
+    FileSystem fs = FileSystem.get(conf);
+    // Write test data to file
+    ClusteringTestUtils.writePointsToFile(sampleData, input, fs, conf);
+    // Select initial centroids
+    RandomSeedGenerator.buildRandom(conf, input, initialPoints, 8, measure, 
1L);
+    // Run k-means
+    Path kMeansOutput = new Path(output, "kmeans");
+    KMeansDriver.run(conf, getTestTempDirPath("testdata"), initialPoints, 
kMeansOutput, 0.001, 10, true, 0.0, false);
+    // Print out clusters
+    ClusterDumper clusterDumper = new ClusterDumper(finalClusterPath(conf,
+            output, 10), new Path(kMeansOutput, "clusteredPoints"));
+    clusterDumper.printClusters(termDictionary);
+  }
+
+  @Test
+  public void testJsonClusterDumper() throws Exception {
+    DistanceMeasure measure = new EuclideanDistanceMeasure();
+    Path input = getTestTempFilePath("input");
+    Path output = getTestTempDirPath("output");
+    Path initialPoints = new Path(output, Cluster.CLUSTERS_DIR + '0' + 
Cluster.FINAL_ITERATION_SUFFIX);
+    Configuration conf = getConfiguration();
+    FileSystem fs = FileSystem.get(conf);
+    // Write test data to file
+    ClusteringTestUtils.writePointsToFile(sampleData, input, fs, conf);
+    // Select initial centroids
+    RandomSeedGenerator.buildRandom(conf, input, initialPoints, 8, measure, 
1L);
+    // Run k-means
+    Path kmeansOutput = new Path(output, "kmeans");
+    KMeansDriver.run(conf, getTestTempDirPath("testdata"), initialPoints, 
kmeansOutput, 0.001, 10, true, 0.0, false);
+    // Print out clusters
+    ClusterDumper clusterDumper = new ClusterDumper(finalClusterPath(conf,
+        output, 10), new Path(kmeansOutput, "clusteredPoints"));
+    clusterDumper.setOutputFormat(ClusterDumper.OUTPUT_FORMAT.JSON);
+    clusterDumper.printClusters(termDictionary);
+  }
+
+  @Test
+  public void testFuzzyKmeans() throws Exception {
+    DistanceMeasure measure = new EuclideanDistanceMeasure();
+    Path input = getTestTempFilePath("input");
+    Path output = getTestTempDirPath("output");
+    Path initialPoints = new Path(output, Cluster.CLUSTERS_DIR + '0' + 
Cluster.FINAL_ITERATION_SUFFIX);
+    Configuration conf = getConfiguration();
+    FileSystem fs = FileSystem.get(conf);
+    // Write test data to file
+    ClusteringTestUtils.writePointsToFile(sampleData, input, fs, conf);
+    // Select initial centroids
+    RandomSeedGenerator.buildRandom(conf, input, initialPoints, 8, measure, 
1L);
+    // Run k-means
+    Path kMeansOutput = new Path(output, "kmeans");
+    FuzzyKMeansDriver.run(conf, getTestTempDirPath("testdata"), initialPoints, 
kMeansOutput, 0.001, 10, 1.1f, true,
+        true, 0, true);
+    // run ClusterDumper
+    ClusterDumper clusterDumper = new ClusterDumper(finalClusterPath(conf,
+        output, 10), new Path(kMeansOutput, "clusteredPoints"));
+    clusterDumper.printClusters(termDictionary);
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/test/java/org/apache/mahout/clustering/TestClusterEvaluator.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/integration/src/test/java/org/apache/mahout/clustering/TestClusterEvaluator.java
 
b/community/mahout-mr/integration/src/test/java/org/apache/mahout/clustering/TestClusterEvaluator.java
new file mode 100644
index 0000000..8a226a0
--- /dev/null
+++ 
b/community/mahout-mr/integration/src/test/java/org/apache/mahout/clustering/TestClusterEvaluator.java
@@ -0,0 +1,321 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.mahout.clustering.canopy.Canopy;
+import org.apache.mahout.clustering.canopy.CanopyDriver;
+import org.apache.mahout.clustering.evaluation.ClusterEvaluator;
+import org.apache.mahout.clustering.evaluation.RepresentativePointsDriver;
+import org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver;
+import org.apache.mahout.clustering.kmeans.KMeansDriver;
+import org.apache.mahout.clustering.kmeans.TestKmeansClustering;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.common.distance.EuclideanDistanceMeasure;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.VectorWritable;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public final class TestClusterEvaluator extends MahoutTestCase {
+  
+  private static final double[][] REFERENCE = { {1, 1}, {2, 1}, {1, 2}, {2, 
2}, {3, 3}, {4, 4}, {5, 4}, {4, 5}, {5, 5}};
+  
+  private List<VectorWritable> referenceData = Lists.newArrayList();
+  
+  private final List<VectorWritable> sampleData = Lists.newArrayList();
+  
+  private Map<Integer,List<VectorWritable>> representativePoints;
+  
+  private List<Cluster> clusters;
+  
+  private static final Logger log = 
LoggerFactory.getLogger(TestClusterEvaluator.class);
+  
+  private Configuration conf;
+  
+  private FileSystem fs;
+  
+  private Path testdata;
+  
+  private Path output;
+  
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+    conf = getConfiguration();
+    fs = FileSystem.get(conf);
+    testdata = getTestTempDirPath("testdata");
+    output = getTestTempDirPath("output");
+    // Create small reference data set
+    referenceData = TestKmeansClustering.getPointsWritable(REFERENCE);
+    // generate larger test data set for the clustering tests to chew on
+    generateSamples();
+  }
+  
+  /**
+   * Generate random samples and add them to the sampleData
+   * 
+   * @param num
+   *          int number of samples to generate
+   * @param mx
+   *          double x-value of the sample mean
+   * @param my
+   *          double y-value of the sample mean
+   * @param sd
+   *          double standard deviation of the samples
+   */
+  private void generateSamples(int num, double mx, double my, double sd) {
+    log.info("Generating {} samples m=[{}, {}] sd={}", num, mx, my, sd);
+    for (int i = 0; i < num; i++) {
+      sampleData.add(new VectorWritable(new DenseVector(new double[] 
{UncommonDistributions.rNorm(mx, sd),
+          UncommonDistributions.rNorm(my, sd)})));
+    }
+  }
+  
+  private void generateSamples() {
+    generateSamples(500, 1, 1, 3);
+    generateSamples(300, 1, 0, 0.5);
+    generateSamples(300, 0, 2, 0.1);
+  }
+  
+  private void printRepPoints(int numIterations) {
+    RepresentativePointsDriver.printRepresentativePoints(output, 
numIterations);
+  }
+  
+  /**
+   * Initialize synthetic data using 4 clusters dC units from origin having 4 
representative points dP from each center
+   * 
+   * @param dC
+   *          a double cluster center offset
+   * @param dP
+   *          a double representative point offset
+   * @param measure
+   *          the DistanceMeasure
+   */
+  private void initData(double dC, double dP, DistanceMeasure measure) {
+    clusters = Lists.newArrayList();
+    clusters.add(new Canopy(new DenseVector(new double[] {-dC, -dC}), 1, 
measure));
+    clusters.add(new Canopy(new DenseVector(new double[] {-dC, dC}), 3, 
measure));
+    clusters.add(new Canopy(new DenseVector(new double[] {dC, dC}), 5, 
measure));
+    clusters.add(new Canopy(new DenseVector(new double[] {dC, -dC}), 7, 
measure));
+    representativePoints = Maps.newHashMap();
+    for (Cluster cluster : clusters) {
+      List<VectorWritable> points = Lists.newArrayList();
+      representativePoints.put(cluster.getId(), points);
+      points.add(new VectorWritable(cluster.getCenter().clone()));
+      points.add(new VectorWritable(cluster.getCenter().plus(new 
DenseVector(new double[] {dP, dP}))));
+      points.add(new VectorWritable(cluster.getCenter().plus(new 
DenseVector(new double[] {dP, -dP}))));
+      points.add(new VectorWritable(cluster.getCenter().plus(new 
DenseVector(new double[] {-dP, -dP}))));
+      points.add(new VectorWritable(cluster.getCenter().plus(new 
DenseVector(new double[] {-dP, dP}))));
+    }
+  }
+
+  @Test
+  public void testRepresentativePoints() throws Exception {
+    ClusteringTestUtils.writePointsToFile(referenceData, new Path(testdata, 
"file1"), fs, conf);
+    DistanceMeasure measure = new EuclideanDistanceMeasure();
+    Configuration conf = getConfiguration();
+    // run using MR reference point calculation
+    CanopyDriver.run(conf, testdata, output, measure, 3.1, 1.1, true, 0.0, 
true);
+    int numIterations = 2;
+    Path clustersIn = new Path(output, "clusters-0-final");
+    RepresentativePointsDriver.run(conf, clustersIn, new Path(output, 
"clusteredPoints"), output, measure,
+        numIterations, false);
+    printRepPoints(numIterations);
+    ClusterEvaluator evaluatorMR = new ClusterEvaluator(conf, clustersIn);
+    // now run again using sequential reference point calculation
+    HadoopUtil.delete(conf, output);
+    CanopyDriver.run(conf, testdata, output, measure, 3.1, 1.1, true, 0.0, 
true);
+    RepresentativePointsDriver.run(conf, clustersIn, new Path(output, 
"clusteredPoints"), output, measure,
+        numIterations, true);
+    printRepPoints(numIterations);
+    ClusterEvaluator evaluatorSeq = new ClusterEvaluator(conf, clustersIn);
+    // compare results
+    assertEquals("InterCluster Density", evaluatorMR.interClusterDensity(), 
evaluatorSeq.interClusterDensity(), EPSILON);
+    assertEquals("IntraCluster Density", evaluatorMR.intraClusterDensity(), 
evaluatorSeq.intraClusterDensity(), EPSILON);
+  }
+  
+  @Test
+  public void testCluster0() throws IOException {
+    ClusteringTestUtils.writePointsToFile(referenceData, new Path(testdata, 
"file1"), fs, conf);
+    DistanceMeasure measure = new EuclideanDistanceMeasure();
+    initData(1, 0.25, measure);
+    ClusterEvaluator evaluator = new ClusterEvaluator(representativePoints, 
clusters, measure);
+    assertEquals("inter cluster density", 0.33333333333333315, 
evaluator.interClusterDensity(), EPSILON);
+    assertEquals("intra cluster density", 0.3656854249492381, 
evaluator.intraClusterDensity(), EPSILON);
+  }
+  
+  @Test
+  public void testCluster1() throws IOException {
+    ClusteringTestUtils.writePointsToFile(referenceData, new Path(testdata, 
"file1"), fs, conf);
+    DistanceMeasure measure = new EuclideanDistanceMeasure();
+    initData(1, 0.5, measure);
+    ClusterEvaluator evaluator = new ClusterEvaluator(representativePoints, 
clusters, measure);
+    assertEquals("inter cluster density", 0.33333333333333315, 
evaluator.interClusterDensity(), EPSILON);
+    assertEquals("intra cluster density", 0.3656854249492381, 
evaluator.intraClusterDensity(), EPSILON);
+  }
+  
+  @Test
+  public void testCluster2() throws IOException {
+    ClusteringTestUtils.writePointsToFile(referenceData, new Path(testdata, 
"file1"), fs, conf);
+    DistanceMeasure measure = new EuclideanDistanceMeasure();
+    initData(1, 0.75, measure);
+    ClusterEvaluator evaluator = new ClusterEvaluator(representativePoints, 
clusters, measure);
+    assertEquals("inter cluster density", 0.33333333333333315, 
evaluator.interClusterDensity(), EPSILON);
+    assertEquals("intra cluster density", 0.3656854249492381, 
evaluator.intraClusterDensity(), EPSILON);
+  }
+  
+  /**
+   * adding an empty cluster should modify the inter cluster density but not 
change the intra-cluster density as that
+   * cluster would have NaN as its intra-cluster density and NaN values are 
ignored by the evaluator
+   * 
+   * @throws IOException
+   */
+  @Test
+  public void testEmptyCluster() throws IOException {
+    ClusteringTestUtils.writePointsToFile(referenceData, new Path(testdata, 
"file1"), fs, conf);
+    DistanceMeasure measure = new EuclideanDistanceMeasure();
+    initData(1, 0.25, measure);
+    Canopy cluster = new Canopy(new DenseVector(new double[] {10, 10}), 19, 
measure);
+    clusters.add(cluster);
+    List<VectorWritable> points = Lists.newArrayList();
+    representativePoints.put(cluster.getId(), points);
+    ClusterEvaluator evaluator = new ClusterEvaluator(representativePoints, 
clusters, measure);
+    assertEquals("inter cluster density", 0.371534146934532, 
evaluator.interClusterDensity(), EPSILON);
+    assertEquals("intra cluster density", 0.3656854249492381, 
evaluator.intraClusterDensity(), EPSILON);
+  }
+  
+  /**
+   * adding an single-valued cluster should modify the inter cluster density 
but not change the intra-cluster density as
+   * that cluster would have NaN as its intra-cluster density and NaN values 
are ignored by the evaluator
+   * 
+   * @throws IOException
+   */
+  @Test
+  public void testSingleValueCluster() throws IOException {
+    ClusteringTestUtils.writePointsToFile(referenceData, new Path(testdata, 
"file1"), fs, conf);
+    DistanceMeasure measure = new EuclideanDistanceMeasure();
+    initData(1, 0.25, measure);
+    Canopy cluster = new Canopy(new DenseVector(new double[] {0, 0}), 19, 
measure);
+    clusters.add(cluster);
+    List<VectorWritable> points = Lists.newArrayList();
+    points.add(new VectorWritable(cluster.getCenter().plus(new DenseVector(new 
double[] {1, 1}))));
+    representativePoints.put(cluster.getId(), points);
+    ClusterEvaluator evaluator = new ClusterEvaluator(representativePoints, 
clusters, measure);
+    assertEquals("inter cluster density", 0.3656854249492381, 
evaluator.interClusterDensity(), EPSILON);
+    assertEquals("intra cluster density", 0.3656854249492381, 
evaluator.intraClusterDensity(), EPSILON);
+  }
+  
+  /**
+   * Representative points extraction will duplicate the cluster center if the 
cluster has no assigned points. These
+   * clusters are included in the inter-cluster density but their NaN 
intra-density values are ignored by the evaluator.
+   * 
+   * @throws IOException
+   */
+  @Test
+  public void testAllSameValueCluster() throws IOException {
+    ClusteringTestUtils.writePointsToFile(referenceData, new Path(testdata, 
"file1"), fs, conf);
+    DistanceMeasure measure = new EuclideanDistanceMeasure();
+    initData(1, 0.25, measure);
+    Canopy cluster = new Canopy(new DenseVector(new double[] {0, 0}), 19, 
measure);
+    clusters.add(cluster);
+    List<VectorWritable> points = Lists.newArrayList();
+    points.add(new VectorWritable(cluster.getCenter()));
+    points.add(new VectorWritable(cluster.getCenter()));
+    points.add(new VectorWritable(cluster.getCenter()));
+    representativePoints.put(cluster.getId(), points);
+    ClusterEvaluator evaluator = new ClusterEvaluator(representativePoints, 
clusters, measure);
+    assertEquals("inter cluster density", 0.3656854249492381, 
evaluator.interClusterDensity(), EPSILON);
+    assertEquals("intra cluster density", 0.3656854249492381, 
evaluator.intraClusterDensity(), EPSILON);
+  }
+  
+  @Test
+  public void testCanopy() throws Exception {
+    ClusteringTestUtils.writePointsToFile(sampleData, new Path(testdata, 
"file1"), fs, conf);
+    DistanceMeasure measure = new EuclideanDistanceMeasure();
+    Configuration conf = getConfiguration();
+    CanopyDriver.run(conf, testdata, output, measure, 3.1, 1.1, true, 0.0, 
true);
+    int numIterations = 10;
+    Path clustersIn = new Path(output, "clusters-0-final");
+    RepresentativePointsDriver.run(conf, clustersIn, new Path(output, 
"clusteredPoints"), output, measure,
+        numIterations, true);
+    //printRepPoints(numIterations);
+    ClusterEvaluator evaluator = new ClusterEvaluator(conf, clustersIn);
+    // now print out the Results
+    System.out.println("Intra-cluster density = " + 
evaluator.intraClusterDensity());
+    System.out.println("Inter-cluster density = " + 
evaluator.interClusterDensity());
+  }
+  
+  @Test
+  public void testKmeans() throws Exception {
+    ClusteringTestUtils.writePointsToFile(sampleData, new Path(testdata, 
"file1"), fs, conf);
+    DistanceMeasure measure = new EuclideanDistanceMeasure();
+    // now run the Canopy job to prime kMeans canopies
+    Configuration conf = getConfiguration();
+    CanopyDriver.run(conf, testdata, output, measure, 3.1, 1.1, false, 0.0, 
true);
+    // now run the KMeans job
+    Path kmeansOutput = new Path(output, "kmeans");
+    KMeansDriver.run(testdata, new Path(output, "clusters-0-final"), 
kmeansOutput, 0.001, 10, true, 0.0, true);
+    int numIterations = 10;
+    Path clustersIn = new Path(kmeansOutput, "clusters-2");
+    RepresentativePointsDriver.run(conf, clustersIn, new Path(kmeansOutput, 
"clusteredPoints"), kmeansOutput, measure,
+        numIterations, true);
+    RepresentativePointsDriver.printRepresentativePoints(kmeansOutput, 
numIterations);
+    ClusterEvaluator evaluator = new ClusterEvaluator(conf, clustersIn);
+    // now print out the Results
+    System.out.println("Intra-cluster density = " + 
evaluator.intraClusterDensity());
+    System.out.println("Inter-cluster density = " + 
evaluator.interClusterDensity());
+  }
+  
+  @Test
+  public void testFuzzyKmeans() throws Exception {
+    ClusteringTestUtils.writePointsToFile(sampleData, new Path(testdata, 
"file1"), fs, conf);
+    DistanceMeasure measure = new EuclideanDistanceMeasure();
+    // now run the Canopy job to prime kMeans canopies
+    Configuration conf = getConfiguration();
+    CanopyDriver.run(conf, testdata, output, measure, 3.1, 1.1, false, 0.0, 
true);
+    Path fuzzyKMeansOutput = new Path(output, "fuzzyk");
+    // now run the KMeans job
+    FuzzyKMeansDriver.run(testdata, new Path(output, "clusters-0-final"), 
fuzzyKMeansOutput, 0.001, 10, 2,
+        true, true, 0, true);
+    int numIterations = 10;
+    Path clustersIn = new Path(fuzzyKMeansOutput, "clusters-4");
+    RepresentativePointsDriver.run(conf, clustersIn, new 
Path(fuzzyKMeansOutput, "clusteredPoints"), fuzzyKMeansOutput,
+        measure, numIterations, true);
+    RepresentativePointsDriver.printRepresentativePoints(fuzzyKMeansOutput, 
numIterations);
+    ClusterEvaluator evaluator = new ClusterEvaluator(conf, clustersIn);
+    // now print out the Results
+    System.out.println("Intra-cluster density = " + 
evaluator.intraClusterDensity());
+    System.out.println("Inter-cluster density = " + 
evaluator.interClusterDensity());
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/test/java/org/apache/mahout/clustering/cdbw/TestCDbwEvaluator.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/integration/src/test/java/org/apache/mahout/clustering/cdbw/TestCDbwEvaluator.java
 
b/community/mahout-mr/integration/src/test/java/org/apache/mahout/clustering/cdbw/TestCDbwEvaluator.java
new file mode 100644
index 0000000..597ed01
--- /dev/null
+++ 
b/community/mahout-mr/integration/src/test/java/org/apache/mahout/clustering/cdbw/TestCDbwEvaluator.java
@@ -0,0 +1,326 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.cdbw;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.mahout.clustering.Cluster;
+import org.apache.mahout.clustering.ClusteringTestUtils;
+import org.apache.mahout.clustering.TestClusterEvaluator;
+import org.apache.mahout.clustering.UncommonDistributions;
+import org.apache.mahout.clustering.canopy.Canopy;
+import org.apache.mahout.clustering.canopy.CanopyDriver;
+import org.apache.mahout.clustering.evaluation.RepresentativePointsDriver;
+import org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver;
+import org.apache.mahout.clustering.kmeans.KMeansDriver;
+import org.apache.mahout.clustering.kmeans.TestKmeansClustering;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.common.distance.EuclideanDistanceMeasure;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class TestCDbwEvaluator extends MahoutTestCase {
+  
+  private static final double[][] REFERENCE = { {1, 1}, {2, 1}, {1, 2}, {2, 
2}, {3, 3}, {4, 4}, {5, 4}, {4, 5}, {5, 5}};
+  
+  private static final Logger log = 
LoggerFactory.getLogger(TestClusterEvaluator.class);
+  
+  private Map<Integer,List<VectorWritable>> representativePoints;
+  
+  private List<Cluster> clusters;
+  
+  private Configuration conf;
+  
+  private FileSystem fs;
+  
+  private final Collection<VectorWritable> sampleData = new ArrayList<>();
+  
+  private List<VectorWritable> referenceData = new ArrayList<>();
+  
+  private Path testdata;
+  
+  private Path output;
+  
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+    conf = getConfiguration();
+    fs = FileSystem.get(conf);
+    testdata = getTestTempDirPath("testdata");
+    output = getTestTempDirPath("output");
+    // Create small reference data set
+    referenceData = TestKmeansClustering.getPointsWritable(REFERENCE);
+    // generate larger test data set for the clustering tests to chew on
+    generateSamples();
+  }
+  
+  /**
+   * Initialize synthetic data using 4 clusters dC units from origin having 4 
representative points dP from each center
+   * 
+   * @param dC
+   *          a double cluster center offset
+   * @param dP
+   *          a double representative point offset
+   * @param measure
+   *          the DistanceMeasure
+   */
+  private void initData(double dC, double dP, DistanceMeasure measure) {
+    clusters = new ArrayList<>();
+    clusters.add(new Canopy(new DenseVector(new double[] {-dC, -dC}), 1, 
measure));
+    clusters.add(new Canopy(new DenseVector(new double[] {-dC, dC}), 3, 
measure));
+    clusters.add(new Canopy(new DenseVector(new double[] {dC, dC}), 5, 
measure));
+    clusters.add(new Canopy(new DenseVector(new double[] {dC, -dC}), 7, 
measure));
+    representativePoints = new HashMap<>();
+    for (Cluster cluster : clusters) {
+      List<VectorWritable> points = new ArrayList<>();
+      representativePoints.put(cluster.getId(), points);
+      points.add(new VectorWritable(cluster.getCenter().clone()));
+      points.add(new VectorWritable(cluster.getCenter().plus(new 
DenseVector(new double[] {dP, dP}))));
+      points.add(new VectorWritable(cluster.getCenter().plus(new 
DenseVector(new double[] {dP, -dP}))));
+      points.add(new VectorWritable(cluster.getCenter().plus(new 
DenseVector(new double[] {-dP, -dP}))));
+      points.add(new VectorWritable(cluster.getCenter().plus(new 
DenseVector(new double[] {-dP, dP}))));
+    }
+  }
+  
+  /**
+   * Generate random samples and add them to the sampleData
+   * 
+   * @param num
+   *          int number of samples to generate
+   * @param mx
+   *          double x-value of the sample mean
+   * @param my
+   *          double y-value of the sample mean
+   * @param sd
+   *          double standard deviation of the samples
+   */
+  private void generateSamples(int num, double mx, double my, double sd) {
+    log.info("Generating {} samples m=[{}, {}] sd={}", num, mx, my, sd);
+    for (int i = 0; i < num; i++) {
+      sampleData.add(new VectorWritable(new DenseVector(new double[] 
{UncommonDistributions.rNorm(mx, sd),
+          UncommonDistributions.rNorm(my, sd)})));
+    }
+  }
+  
+  private void generateSamples() {
+    generateSamples(500, 1, 1, 3);
+    generateSamples(300, 1, 0, 0.5);
+    generateSamples(300, 0, 2, 0.1);
+  }
+  
+  @Test
+  public void testCDbw0() throws IOException {
+    ClusteringTestUtils.writePointsToFile(referenceData, 
getTestTempFilePath("testdata/file1"), fs, conf);
+    DistanceMeasure measure = new EuclideanDistanceMeasure();
+    initData(1, 0.25, measure);
+    CDbwEvaluator evaluator = new CDbwEvaluator(representativePoints, 
clusters, measure);
+    System.out.println("CDbw = " + evaluator.getCDbw());
+    System.out.println("Intra-cluster density = " + 
evaluator.intraClusterDensity());
+    System.out.println("Inter-cluster density = " + 
evaluator.interClusterDensity());
+    System.out.println("Separation = " + evaluator.separation());
+  }
+  
+  @Test
+  public void testCDbw1() throws IOException {
+    ClusteringTestUtils.writePointsToFile(referenceData, 
getTestTempFilePath("testdata/file1"), fs, conf);
+    DistanceMeasure measure = new EuclideanDistanceMeasure();
+    initData(1, 0.5, measure);
+    CDbwEvaluator evaluator = new CDbwEvaluator(representativePoints, 
clusters, measure);
+    System.out.println("CDbw = " + evaluator.getCDbw());
+    System.out.println("Intra-cluster density = " + 
evaluator.intraClusterDensity());
+    System.out.println("Inter-cluster density = " + 
evaluator.interClusterDensity());
+    System.out.println("Separation = " + evaluator.separation());
+  }
+  
+  @Test
+  public void testCDbw2() throws IOException {
+    ClusteringTestUtils.writePointsToFile(referenceData, 
getTestTempFilePath("testdata/file1"), fs, conf);
+    DistanceMeasure measure = new EuclideanDistanceMeasure();
+    initData(1, 0.75, measure);
+    CDbwEvaluator evaluator = new CDbwEvaluator(representativePoints, 
clusters, measure);
+    System.out.println("CDbw = " + evaluator.getCDbw());
+    System.out.println("Intra-cluster density = " + 
evaluator.intraClusterDensity());
+    System.out.println("Inter-cluster density = " + 
evaluator.interClusterDensity());
+    System.out.println("Separation = " + evaluator.separation());
+  }
+  
+  @Test
+  public void testEmptyCluster() throws IOException {
+    ClusteringTestUtils.writePointsToFile(referenceData, 
getTestTempFilePath("testdata/file1"), fs, conf);
+    DistanceMeasure measure = new EuclideanDistanceMeasure();
+    initData(1, 0.25, measure);
+    Canopy cluster = new Canopy(new DenseVector(new double[] {10, 10}), 19, 
measure);
+    clusters.add(cluster);
+    List<VectorWritable> points = new ArrayList<>();
+    representativePoints.put(cluster.getId(), points);
+    CDbwEvaluator evaluator = new CDbwEvaluator(representativePoints, 
clusters, measure);
+    System.out.println("CDbw = " + evaluator.getCDbw());
+    System.out.println("Intra-cluster density = " + 
evaluator.intraClusterDensity());
+    System.out.println("Inter-cluster density = " + 
evaluator.interClusterDensity());
+    System.out.println("Separation = " + evaluator.separation());
+  }
+  
+  @Test
+  public void testSingleValueCluster() throws IOException {
+    ClusteringTestUtils.writePointsToFile(referenceData, 
getTestTempFilePath("testdata/file1"), fs, conf);
+    DistanceMeasure measure = new EuclideanDistanceMeasure();
+    initData(1, 0.25, measure);
+    Canopy cluster = new Canopy(new DenseVector(new double[] {0, 0}), 19, 
measure);
+    clusters.add(cluster);
+    List<VectorWritable> points = new ArrayList<>();
+    points.add(new VectorWritable(cluster.getCenter().plus(new DenseVector(new 
double[] {1, 1}))));
+    representativePoints.put(cluster.getId(), points);
+    CDbwEvaluator evaluator = new CDbwEvaluator(representativePoints, 
clusters, measure);
+    System.out.println("CDbw = " + evaluator.getCDbw());
+    System.out.println("Intra-cluster density = " + 
evaluator.intraClusterDensity());
+    System.out.println("Inter-cluster density = " + 
evaluator.interClusterDensity());
+    System.out.println("Separation = " + evaluator.separation());
+  }
+  
+  /**
+   * Representative points extraction will duplicate the cluster center if the 
cluster has no assigned points. These
+   * clusters should be ignored like empty clusters above
+   * 
+   * @throws IOException
+   */
+  @Test
+  public void testAllSameValueCluster() throws IOException {
+    ClusteringTestUtils.writePointsToFile(referenceData, 
getTestTempFilePath("testdata/file1"), fs, conf);
+    DistanceMeasure measure = new EuclideanDistanceMeasure();
+    initData(1, 0.25, measure);
+    Canopy cluster = new Canopy(new DenseVector(new double[] {0, 0}), 19, 
measure);
+    clusters.add(cluster);
+    List<VectorWritable> points = new ArrayList<>();
+    points.add(new VectorWritable(cluster.getCenter()));
+    points.add(new VectorWritable(cluster.getCenter()));
+    points.add(new VectorWritable(cluster.getCenter()));
+    representativePoints.put(cluster.getId(), points);
+    CDbwEvaluator evaluator = new CDbwEvaluator(representativePoints, 
clusters, measure);
+    System.out.println("CDbw = " + evaluator.getCDbw());
+    System.out.println("Intra-cluster density = " + 
evaluator.intraClusterDensity());
+    System.out.println("Inter-cluster density = " + 
evaluator.interClusterDensity());
+    System.out.println("Separation = " + evaluator.separation());
+  }
+  
+  /**
+   * Clustering can produce very, very tight clusters that can cause the std 
calculation to fail. These clusters should
+   * be processed correctly.
+   * 
+   * @throws IOException
+   */
+  @Test
+  public void testAlmostSameValueCluster() throws IOException {
+    ClusteringTestUtils.writePointsToFile(referenceData, 
getTestTempFilePath("testdata/file1"), fs, conf);
+    DistanceMeasure measure = new EuclideanDistanceMeasure();
+    initData(1, 0.25, measure);
+    Canopy cluster = new Canopy(new DenseVector(new double[] {0, 0}), 19, 
measure);
+    clusters.add(cluster);
+    List<VectorWritable> points = new ArrayList<>();
+    Vector delta = new DenseVector(new double[] {0, Double.MIN_NORMAL});
+    points.add(new VectorWritable(delta.clone()));
+    points.add(new VectorWritable(delta.clone()));
+    points.add(new VectorWritable(delta.clone()));
+    points.add(new VectorWritable(delta.clone()));
+    points.add(new VectorWritable(delta.clone()));
+    representativePoints.put(cluster.getId(), points);
+    CDbwEvaluator evaluator = new CDbwEvaluator(representativePoints, 
clusters, measure);
+    System.out.println("CDbw = " + evaluator.getCDbw());
+    System.out.println("Intra-cluster density = " + 
evaluator.intraClusterDensity());
+    System.out.println("Inter-cluster density = " + 
evaluator.interClusterDensity());
+    System.out.println("Separation = " + evaluator.separation());
+  }
+  
+  @Test
+  public void testCanopy() throws Exception {
+    ClusteringTestUtils.writePointsToFile(sampleData, 
getTestTempFilePath("testdata/file1"), fs, conf);
+    DistanceMeasure measure = new EuclideanDistanceMeasure();
+    CanopyDriver.run(getConfiguration(), testdata, output, measure, 3.1, 2.1, 
true, 0.0, true);
+    int numIterations = 10;
+    Path clustersIn = new Path(output, "clusters-0-final");
+    RepresentativePointsDriver.run(conf, clustersIn, new Path(output, 
"clusteredPoints"), output, measure,
+        numIterations, true);
+    CDbwEvaluator evaluator = new CDbwEvaluator(conf, clustersIn);
+    // printRepPoints(numIterations);
+    // now print out the Results
+    System.out.println("Canopy CDbw = " + evaluator.getCDbw());
+    System.out.println("Intra-cluster density = " + 
evaluator.intraClusterDensity());
+    System.out.println("Inter-cluster density = " + 
evaluator.interClusterDensity());
+    System.out.println("Separation = " + evaluator.separation());
+  }
+  
+  @Test
+  public void testKmeans() throws Exception {
+    ClusteringTestUtils.writePointsToFile(sampleData, 
getTestTempFilePath("testdata/file1"), fs, conf);
+    DistanceMeasure measure = new EuclideanDistanceMeasure();
+    // now run the Canopy job to prime kMeans canopies
+    CanopyDriver.run(getConfiguration(), testdata, output, measure, 3.1, 2.1, 
false, 0.0, true);
+    // now run the KMeans job
+    Path kmeansOutput = new Path(output, "kmeans");
+    KMeansDriver.run(testdata, new Path(output, "clusters-0-final"), 
kmeansOutput, 0.001, 10, true, 0.0, true);
+    int numIterations = 10;
+    Path clustersIn = new Path(kmeansOutput, "clusters-10-final");
+    RepresentativePointsDriver.run(conf, clustersIn, new Path(kmeansOutput, 
"clusteredPoints"), kmeansOutput, measure,
+        numIterations, true);
+    CDbwEvaluator evaluator = new CDbwEvaluator(conf, clustersIn);
+    RepresentativePointsDriver.printRepresentativePoints(kmeansOutput, 
numIterations);
+    // now print out the Results
+    System.out.println("K-Means CDbw = " + evaluator.getCDbw());
+    System.out.println("Intra-cluster density = " + 
evaluator.intraClusterDensity());
+    System.out.println("Inter-cluster density = " + 
evaluator.interClusterDensity());
+    System.out.println("Separation = " + evaluator.separation());
+  }
+  
+  @Test
+  public void testFuzzyKmeans() throws Exception {
+    ClusteringTestUtils.writePointsToFile(sampleData, 
getTestTempFilePath("testdata/file1"), fs, conf);
+    DistanceMeasure measure = new EuclideanDistanceMeasure();
+    // now run the Canopy job to prime kMeans canopies
+    CanopyDriver.run(getConfiguration(), testdata, output, measure, 3.1, 2.1, 
false, 0.0, true);
+    Path fuzzyKMeansOutput = new Path(output, "fuzzyk");
+    // now run the KMeans job
+    FuzzyKMeansDriver.run(testdata, new Path(output, "clusters-0-final"), 
fuzzyKMeansOutput, 0.001, 10, 2,
+        true, true, 0, true);
+    int numIterations = 10;
+    Path clustersIn = new Path(fuzzyKMeansOutput, "clusters-4");
+    RepresentativePointsDriver.run(conf, clustersIn, new 
Path(fuzzyKMeansOutput, "clusteredPoints"), fuzzyKMeansOutput,
+        measure, numIterations, true);
+    CDbwEvaluator evaluator = new CDbwEvaluator(conf, clustersIn);
+    RepresentativePointsDriver.printRepresentativePoints(fuzzyKMeansOutput, 
numIterations);
+    // now print out the Results
+    System.out.println("Fuzzy K-Means CDbw = " + evaluator.getCDbw());
+    System.out.println("Intra-cluster density = " + 
evaluator.intraClusterDensity());
+    System.out.println("Inter-cluster density = " + 
evaluator.interClusterDensity());
+    System.out.println("Separation = " + evaluator.separation());
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/test/java/org/apache/mahout/text/MailArchivesClusteringAnalyzerTest.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/integration/src/test/java/org/apache/mahout/text/MailArchivesClusteringAnalyzerTest.java
 
b/community/mahout-mr/integration/src/test/java/org/apache/mahout/text/MailArchivesClusteringAnalyzerTest.java
new file mode 100644
index 0000000..ba73c82
--- /dev/null
+++ 
b/community/mahout-mr/integration/src/test/java/org/apache/mahout/text/MailArchivesClusteringAnalyzerTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text;
+
+import java.io.Reader;
+import java.io.StringReader;
+
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.TokenStream;
+import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
+import org.apache.mahout.common.MahoutTestCase;
+import org.junit.Test;
+
+/**
+ * Unit tests for the MailArchivesClusteringAnalyzer text analyzer.
+ */
+public class MailArchivesClusteringAnalyzerTest extends MahoutTestCase {
+  
+  @Test
+  public void testAnalysis() throws Exception {
+    Analyzer analyzer = new MailArchivesClusteringAnalyzer();
+    
+    String text = "A test message\n"
+                  + "atokenthatistoolongtobeusefulforclustertextanalysis\n"
+                  + "Mahout is a scalable, machine-learning LIBRARY\n"
+                  + "we've added some additional stopwords such as html, 
mailto, regards\t"
+                  + "apache_hadoop provides the foundation for scalability\n"
+                  + "www.nabble.com [email protected]\n"
+                  + "public void int protected package";
+    Reader reader = new StringReader(text);
+    
+    // if you change the text above, then you may need to change this as well
+    // order matters too
+    String[] expectedTokens = {
+        "test", "mahout", "scalabl", "machin", "learn", "librari", "weve", 
"ad",
+        "stopword", "apache_hadoop","provid", "foundat", "scalabl"
+    };
+        
+    TokenStream tokenStream = analyzer.tokenStream("test", reader);
+    assertNotNull(tokenStream);
+    tokenStream.reset();
+    CharTermAttribute termAtt = 
tokenStream.addAttribute(CharTermAttribute.class);
+    int e = 0;
+    while (tokenStream.incrementToken() && e < expectedTokens.length) {
+      assertEquals(expectedTokens[e++], termAtt.toString());
+    }
+    assertEquals(e, expectedTokens.length);
+    tokenStream.end();
+    tokenStream.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromMailArchivesTest.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromMailArchivesTest.java
 
b/community/mahout-mr/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromMailArchivesTest.java
new file mode 100644
index 0000000..ef2b8a6
--- /dev/null
+++ 
b/community/mahout-mr/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromMailArchivesTest.java
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.text;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.util.zip.GZIPOutputStream;
+
+import org.apache.commons.lang3.SystemUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterator;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test case for the SequenceFilesFromMailArchives command-line application.
+ */
+public final class SequenceFilesFromMailArchivesTest extends MahoutTestCase {
+
+  private File inputDir;
+
+  /**
+   * Create the input and output directories needed for testing
+   * the SequenceFilesFromMailArchives application.
+   */
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+    inputDir = getTestTempDir("mail-archives-in");
+
+    // write test mail messages to a gzipped file in a nested directory
+    File subDir = new File(inputDir, "subdir");
+    subDir.mkdir();
+    File gzFile = new File(subDir, "mail-messages.gz");
+    try (GZIPOutputStream gzOut = new GZIPOutputStream(new 
FileOutputStream(gzFile))) {
+      gzOut.write(testMailMessages.getBytes("UTF-8"));
+      gzOut.finish();
+    }
+    
+    File subDir2 = new File(subDir, "subsubdir");
+    subDir2.mkdir();
+    File gzFile2 = new File(subDir2, "mail-messages-2.gz");
+    try (GZIPOutputStream gzOut = new GZIPOutputStream(new 
FileOutputStream(gzFile2))) {
+      gzOut.write(testMailMessages.getBytes("UTF-8"));
+      gzOut.finish();
+    }
+  }
+
+  @Test
+  public void testSequential() throws Exception {
+
+    File outputDir = this.getTestTempDir("mail-archives-out");
+
+    String[] args = {
+      "--input", inputDir.getAbsolutePath(),
+      "--output", outputDir.getAbsolutePath(),
+      "--charset", "UTF-8",
+      "--keyPrefix", "TEST",
+      "--method", "sequential",
+      "--body", "--subject", "--separator", ""
+    };
+
+    // run the application's main method
+    SequenceFilesFromMailArchives.main(args);
+
+    // app should create a single SequenceFile named "chunk-0" in the output 
dir
+    File expectedChunkFile = new File(outputDir, "chunk-0");
+    String expectedChunkPath = expectedChunkFile.getAbsolutePath();
+    Assert.assertTrue("Expected chunk file " + expectedChunkPath + " not 
found!", expectedChunkFile.isFile());
+
+    Configuration conf = getConfiguration();
+    SequenceFileIterator<Text, Text> iterator = new SequenceFileIterator<>(new 
Path(expectedChunkPath), true, conf);
+    Assert.assertTrue("First key/value pair not found!", iterator.hasNext());
+    Pair<Text, Text> record = iterator.next();
+
+    File parentFile = new File(new File(new File("TEST"), "subdir"), 
"mail-messages.gz");
+    Assert.assertEquals(new File(parentFile, testVars[0][0]).toString(), 
record.getFirst().toString());
+    Assert.assertEquals(testVars[0][1] + testVars[0][2], 
record.getSecond().toString());
+
+    Assert.assertTrue("Second key/value pair not found!", iterator.hasNext());
+
+    record = iterator.next();
+    Assert.assertEquals(new File(parentFile, testVars[1][0]).toString(), 
record.getFirst().toString());
+    Assert.assertEquals(testVars[1][1] + testVars[1][2], 
record.getSecond().toString());
+
+    record = iterator.next();
+    File parentFileSubSubDir = new File(new File(new File(new File("TEST"), 
"subdir"), "subsubdir"), "mail-messages-2.gz");
+    Assert.assertEquals(new File(parentFileSubSubDir, 
testVars[0][0]).toString(), record.getFirst().toString());
+    Assert.assertEquals(testVars[0][1] + testVars[0][2], 
record.getSecond().toString());
+
+    Assert.assertTrue("Second key/value pair not found!", iterator.hasNext());
+    record = iterator.next();
+    Assert.assertEquals(new File(parentFileSubSubDir, 
testVars[1][0]).toString(), record.getFirst().toString());
+    Assert.assertEquals(testVars[1][1] + testVars[1][2], 
record.getSecond().toString());
+
+    Assert.assertFalse("Only two key/value pairs expected!", 
iterator.hasNext());
+  }
+
+  @Test
+  public void testMapReduce() throws Exception {
+
+    Path tmpDir = getTestTempDirPath();
+    Path mrOutputDir = new Path(tmpDir, "mail-archives-out-mr");
+    Configuration configuration = getConfiguration();
+    FileSystem fs = FileSystem.get(configuration);
+
+    File expectedInputFile = new File(inputDir.toString());
+
+    String[] args = {
+      "-Dhadoop.tmp.dir=" + configuration.get("hadoop.tmp.dir"),
+      "--input", expectedInputFile.getAbsolutePath(),
+      "--output", mrOutputDir.toString(),
+      "--charset", "UTF-8",
+      "--keyPrefix", "TEST",
+      "--method", "mapreduce",
+      "--body", "--subject", "--separator", ""
+    };
+
+    // run the application's main method
+    SequenceFilesFromMailArchives.main(args);
+
+    // app should create a single SequenceFile named "chunk-0" in the output 
dir
+    FileStatus[] fileStatuses = 
fs.listStatus(mrOutputDir.suffix("/part-m-00000"));
+    assertEquals(1, fileStatuses.length); // only one
+    assertEquals("part-m-00000", fileStatuses[0].getPath().getName());
+    SequenceFileIterator<Text, Text> iterator =
+      new SequenceFileIterator<>(mrOutputDir.suffix("/part-m-00000"), true, 
configuration);
+
+    Assert.assertTrue("First key/value pair not found!", iterator.hasNext());
+    Pair<Text, Text> record = iterator.next();
+
+    File parentFileSubSubDir = new File(new File(new File(new File("TEST"), 
"subdir"), "subsubdir"), "mail-messages-2.gz");
+
+    String expected = record.getFirst().toString();
+    if (SystemUtils.IS_OS_WINDOWS) {
+      expected = expected.replace("/", "\\");
+    }
+    Assert.assertEquals(new File(parentFileSubSubDir, 
testVars[0][0]).toString(), expected);
+    Assert.assertEquals(testVars[0][1] + testVars[0][2], 
record.getSecond().toString());
+    Assert.assertTrue("Second key/value pair not found!", iterator.hasNext());
+
+    record = iterator.next();
+    expected = record.getFirst().toString();
+    if (SystemUtils.IS_OS_WINDOWS) {
+      expected = expected.replace("/", "\\");
+    }
+    Assert.assertEquals(new File(parentFileSubSubDir, 
testVars[1][0]).toString(), expected);
+    Assert.assertEquals(testVars[1][1] + testVars[1][2], 
record.getSecond().toString());
+
+    // test other file
+    File parentFile = new File(new File(new File("TEST"), "subdir"), 
"mail-messages.gz");
+    record = iterator.next();
+    expected = record.getFirst().toString();
+    if (SystemUtils.IS_OS_WINDOWS) {
+      expected = expected.replace("/", "\\");
+    }
+    Assert.assertEquals(new File(parentFile, testVars[0][0]).toString(), 
expected);
+    Assert.assertEquals(testVars[0][1] + testVars[0][2], 
record.getSecond().toString());
+    Assert.assertTrue("Second key/value pair not found!", iterator.hasNext());
+
+    record = iterator.next();
+    expected = record.getFirst().toString();
+    if (SystemUtils.IS_OS_WINDOWS) {
+      expected = expected.replace("/", "\\");
+    }
+    Assert.assertEquals(new File(parentFile, testVars[1][0]).toString(), 
expected);
+    Assert.assertEquals(testVars[1][1] + testVars[1][2], 
record.getSecond().toString());
+    Assert.assertFalse("Only four key/value pairs expected!", 
iterator.hasNext());
+  }
+
+  // Messages extracted and made anonymous from the ASF mail archives
+  private static final String[][] testVars = {
+    new String[] {
+      "[email protected]",
+      "Ant task for JDK1.1 collections build option",
+      "\nThis is just a test message\n--\nTesty McTester\n"
+    },
+    new String[] {
+      "[email protected]",
+      "Problem with build files in several directories",
+      "\nHi all,\nThis is another test message.\nRegards,\nAnother Test\n"
+    }
+  };
+
+  private static final String testMailMessages =
+    "From [email protected]  Mon Jul 24 19:13:53 2000\n"
+      + "Return-Path: <[email protected]>\n"
+      + "Mailing-List: contact [email protected]; run by 
ezmlm\n"
+      + "Delivered-To: mailing list [email protected]\n"
+      + "Received: (qmail 49267 invoked from network); 24 Jul 2000 19:13:53 
-0000\n"
+      + "Message-ID: <" + testVars[0][0] + ">\n"
+      + "From: \"Testy McTester\" <[email protected]>\n"
+      + "To: <[email protected]>\n"
+      + "Subject: " + testVars[0][1] + '\n'
+      + "Date: Mon, 24 Jul 2000 12:24:56 -0700\n"
+      + "MIME-Version: 1.0\n"
+      + "Content-Type: text/plain;\n"
+      + "  charset=\"Windows-1252\"\n"
+      + "Content-Transfer-Encoding: 7bit\n"
+      + "X-Spam-Rating: locus.apache.org 1.6.2 0/1000/N\n"
+      + testVars[0][2] + '\n'
+      + "From [email protected]  Wed Jul 26 11:32:16 2000\n"
+      + "Return-Path: <[email protected]>\n"
+      + "Mailing-List: contact [email protected]; run by 
ezmlm\n"
+      + "Delivered-To: mailing list [email protected]\n"
+      + "Received: (qmail 73966 invoked from network); 26 Jul 2000 11:32:16 
-0000\n"
+      + "User-Agent: Microsoft-Outlook-Express-Macintosh-Edition/5.02.2022\n"
+      + "Date: Wed, 26 Jul 2000 13:32:08 +0200\n"
+      + "Subject: " + testVars[1][1] + '\n'
+      + "From: Another Test <[email protected]>\n"
+      + "To: <[email protected]>\n"
+      + "Message-Id: <" + testVars[1][0] + ">\n"
+      + "Mime-Version: 1.0\n"
+      + "Content-Type: text/plain; charset=\"US-ASCII\"\n"
+      + "Content-Transfer-Encoding: 7bit\n"
+      + "X-Spam-Rating: locus.apache.org 1.6.2 0/1000/N\n"
+      + testVars[1][2];
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/test/java/org/apache/mahout/text/TestPathFilter.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/integration/src/test/java/org/apache/mahout/text/TestPathFilter.java
 
b/community/mahout-mr/integration/src/test/java/org/apache/mahout/text/TestPathFilter.java
new file mode 100644
index 0000000..227521a
--- /dev/null
+++ 
b/community/mahout-mr/integration/src/test/java/org/apache/mahout/text/TestPathFilter.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.text;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+
+/**
+ * Dummy Path Filter for testing the MapReduce version of
+ * SequenceFilesFromDirectory
+ */
+public class TestPathFilter implements PathFilter {
+
+  @Override
+  public boolean accept(Path path) {
+    return path.getName().startsWith("t") || path.getName().startsWith("r") || 
path.getName().startsWith("f");
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/integration/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java
 
b/community/mahout-mr/integration/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java
new file mode 100644
index 0000000..040c8e4
--- /dev/null
+++ 
b/community/mahout-mr/integration/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java
@@ -0,0 +1,313 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.io.Charsets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterator;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class TestSequenceFilesFromDirectory extends MahoutTestCase {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(TestSequenceFilesFromDirectory.class);
+
+  private static final String[][] DATA1 = {
+    {"test1", "This is the first text."},
+    {"test2", "This is the second text."},
+    {"test3", "This is the third text."}
+  };
+
+  private static final String[][] DATA2 = {
+    {"recursive_test1", "This is the first text."},
+    {"recursive_test2", "This is the second text."},
+    {"recursive_test3", "This is the third text."}
+  };
+
+  @Test
+  public void testSequenceFileFromDirectoryBasic() throws Exception {
+    // parameters
+    Configuration configuration = getConfiguration();
+
+    FileSystem fs = FileSystem.get(configuration);
+
+    // create
+    Path tmpDir = this.getTestTempDirPath();
+    Path inputDir = new Path(tmpDir, "inputDir");
+    fs.mkdirs(inputDir);
+
+    Path outputDir = new Path(tmpDir, "outputDir");
+    Path outputDirRecursive = new Path(tmpDir, "outputDirRecursive");
+
+    Path inputDirRecursive = new Path(tmpDir, "inputDirRecur");
+    fs.mkdirs(inputDirRecursive);
+
+    // prepare input files
+    createFilesFromArrays(configuration, inputDir, DATA1);
+
+    SequenceFilesFromDirectory.main(new String[]{
+      "--input", inputDir.toString(),
+      "--output", outputDir.toString(),
+      "--chunkSize", "64",
+      "--charset", Charsets.UTF_8.name(),
+      "--keyPrefix", "UID",
+      "--method", "sequential"});
+
+    // check output chunk files
+    checkChunkFiles(configuration, outputDir, DATA1, "UID");
+
+    createRecursiveDirFilesFromArrays(configuration, inputDirRecursive, DATA2);
+
+    FileStatus fstInputPath = fs.getFileStatus(inputDirRecursive);
+    String dirs = HadoopUtil.buildDirList(fs, fstInputPath);
+
+    System.out.println("\n\n ----- recursive dirs: " + dirs);
+    SequenceFilesFromDirectory.main(new String[]{
+      "--input", inputDirRecursive.toString(),
+      "--output", outputDirRecursive.toString(),
+      "--chunkSize", "64",
+      "--charset", Charsets.UTF_8.name(),
+      "--keyPrefix", "UID",
+      "--method", "sequential"});
+
+    checkRecursiveChunkFiles(configuration, outputDirRecursive, DATA2, "UID");
+  }
+
+  @Test
+  public void testSequenceFileFromDirectoryMapReduce() throws Exception {
+
+    Configuration conf = getConfiguration();
+
+    FileSystem fs = FileSystem.get(conf);
+
+    // create
+    Path tmpDir = this.getTestTempDirPath();
+    Path inputDir = new Path(tmpDir, "inputDir");
+    fs.mkdirs(inputDir);
+
+    Path inputDirRecur = new Path(tmpDir, "inputDirRecur");
+    fs.mkdirs(inputDirRecur);
+
+    Path mrOutputDir = new Path(tmpDir, "mrOutputDir");
+    Path mrOutputDirRecur = new Path(tmpDir, "mrOutputDirRecur");
+
+    createFilesFromArrays(conf, inputDir, DATA1);
+
+    SequenceFilesFromDirectory.main(new String[]{
+      "-Dhadoop.tmp.dir=" + conf.get("hadoop.tmp.dir"),
+      "--input", inputDir.toString(),
+      "--output", mrOutputDir.toString(),
+      "--chunkSize", "64",
+      "--charset", Charsets.UTF_8.name(),
+      "--method", "mapreduce",
+      "--keyPrefix", "UID",
+      "--fileFilterClass", "org.apache.mahout.text.TestPathFilter"
+    });
+
+    checkMRResultFiles(conf, mrOutputDir, DATA1, "UID");
+
+    createRecursiveDirFilesFromArrays(conf, inputDirRecur, DATA2);
+
+    FileStatus fst_input_path = fs.getFileStatus(inputDirRecur);
+    String dirs = HadoopUtil.buildDirList(fs, fst_input_path);
+
+    logger.info("\n\n ---- recursive dirs: {}", dirs);
+
+    SequenceFilesFromDirectory.main(new String[]{
+      "-Dhadoop.tmp.dir=" + conf.get("hadoop.tmp.dir"),
+      "--input", inputDirRecur.toString(),
+      "--output", mrOutputDirRecur.toString(),
+      "--chunkSize", "64",
+      "--charset", Charsets.UTF_8.name(),
+      "--method", "mapreduce",
+      "--keyPrefix", "UID",
+      "--fileFilterClass", "org.apache.mahout.text.TestPathFilter"
+    });
+
+    checkMRResultFilesRecursive(conf, mrOutputDirRecur, DATA2, "UID");
+  }
+
+
+  private static void createFilesFromArrays(Configuration conf, Path inputDir, 
String[][] data) throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+    for (String[] aData : data) {
+      try (OutputStreamWriter writer =
+               new OutputStreamWriter(fs.create(new Path(inputDir, aData[0])), 
Charsets.UTF_8)){
+        writer.write(aData[1]);
+      }
+    }
+  }
+
+  private static void createRecursiveDirFilesFromArrays(Configuration 
configuration, Path inputDir,
+                                                        String[][] data) 
throws IOException {
+    FileSystem fs = FileSystem.get(configuration);
+
+    logger.info("creativeRecursiveDirFilesFromArrays > based on: {}", 
inputDir.toString());
+    Path curPath;
+    String currentRecursiveDir = inputDir.toString();
+
+    for (String[] aData : data) {
+      currentRecursiveDir += "/" + aData[0];
+      File subDir = new File(currentRecursiveDir);
+      subDir.mkdir();
+
+      curPath = new Path(subDir.toString(), "file.txt");
+      logger.info("Created file: {}", curPath.toString());
+
+      try (OutputStreamWriter writer = new 
OutputStreamWriter(fs.create(curPath), Charsets.UTF_8)){
+        writer.write(aData[1]);
+      }
+    }
+  }
+
+  private static void checkChunkFiles(Configuration configuration,
+                                      Path outputDir,
+                                      String[][] data,
+                                      String prefix) throws IOException {
+    FileSystem fs = FileSystem.get(configuration);
+
+    // output exists?
+    FileStatus[] fileStatuses = fs.listStatus(outputDir, 
PathFilters.logsCRCFilter());
+    assertEquals(1, fileStatuses.length); // only one
+    assertEquals("chunk-0", fileStatuses[0].getPath().getName());
+
+    Map<String, String> fileToData = new HashMap<>();
+    for (String[] aData : data) {
+      fileToData.put(prefix + Path.SEPARATOR + aData[0], aData[1]);
+    }
+
+    // read a chunk to check content
+    try (SequenceFileIterator<Text, Text> iterator =
+             new SequenceFileIterator<>(fileStatuses[0].getPath(), true, 
configuration)){
+      while (iterator.hasNext()) {
+        Pair<Text, Text> record = iterator.next();
+        String retrievedData = 
fileToData.get(record.getFirst().toString().trim());
+        assertNotNull(retrievedData);
+        assertEquals(retrievedData, record.getSecond().toString().trim());
+      }
+    }
+  }
+
+  private static void checkRecursiveChunkFiles(Configuration configuration,
+                                               Path outputDir,
+                                               String[][] data,
+                                               String prefix) throws 
IOException {
+    FileSystem fs = FileSystem.get(configuration);
+
+    System.out.println(" ----------- check_Recursive_ChunkFiles ------------");
+
+    // output exists?
+    FileStatus[] fileStatuses = fs.listStatus(outputDir, 
PathFilters.logsCRCFilter());
+    assertEquals(1, fileStatuses.length); // only one
+    assertEquals("chunk-0", fileStatuses[0].getPath().getName());
+
+
+    Map<String, String> fileToData = new HashMap<>();
+    String currentPath = prefix;
+    for (String[] aData : data) {
+      currentPath += Path.SEPARATOR + aData[0];
+      fileToData.put(currentPath + Path.SEPARATOR + "file.txt", aData[1]);
+    }
+
+    // read a chunk to check content
+    try (SequenceFileIterator<Text, Text> iterator =
+             new SequenceFileIterator<>(fileStatuses[0].getPath(), true, 
configuration)) {
+      while (iterator.hasNext()) {
+        Pair<Text, Text> record = iterator.next();
+        String retrievedData = 
fileToData.get(record.getFirst().toString().trim());
+        System.out.printf("%s >> %s\n", record.getFirst().toString().trim(), 
record.getSecond().toString().trim());
+
+        assertNotNull(retrievedData);
+        assertEquals(retrievedData, record.getSecond().toString().trim());
+        System.out.printf(">>> k: %s, v: %s\n", record.getFirst().toString(), 
record.getSecond().toString());
+      }
+    }
+  }
+
+  private static void checkMRResultFiles(Configuration conf, Path outputDir,
+                                         String[][] data, String prefix) 
throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+
+    // output exists?
+    FileStatus[] fileStatuses = 
fs.listStatus(outputDir.suffix("/part-m-00000"), PathFilters.logsCRCFilter());
+    assertEquals(1, fileStatuses.length); // only one
+    assertEquals("part-m-00000", fileStatuses[0].getPath().getName());
+    Map<String, String> fileToData = new HashMap<>();
+    for (String[] aData : data) {
+      System.out.printf("map.put: %s %s\n", prefix + Path.SEPARATOR + 
aData[0], aData[1]);
+      fileToData.put(prefix + Path.SEPARATOR + aData[0], aData[1]);
+    }
+
+    // read a chunk to check content
+    try (SequenceFileIterator<Text, Text> iterator = new 
SequenceFileIterator<>(
+        fileStatuses[0].getPath(), true, conf)) {
+      while (iterator.hasNext()) {
+        Pair<Text, Text> record = iterator.next();
+        String retrievedData = 
fileToData.get(record.getFirst().toString().trim());
+
+        System.out.printf("MR> %s >> %s\n", 
record.getFirst().toString().trim(), record.getSecond().toString().trim());
+        assertNotNull(retrievedData);
+        assertEquals(retrievedData, record.getSecond().toString().trim());
+      }
+    }
+  }
+
+  private static void checkMRResultFilesRecursive(Configuration configuration, 
Path outputDir,
+                                                  String[][] data, String 
prefix) throws IOException {
+    FileSystem fs = FileSystem.get(configuration);
+
+    // output exists?
+    FileStatus[] fileStatuses = 
fs.listStatus(outputDir.suffix("/part-m-00000"), PathFilters.logsCRCFilter());
+    assertEquals(1, fileStatuses.length); // only one
+    assertEquals("part-m-00000", fileStatuses[0].getPath().getName());
+    Map<String, String> fileToData = new HashMap<>();
+    String currentPath = prefix;
+
+    for (String[] aData : data) {
+      currentPath += Path.SEPARATOR + aData[0];
+      fileToData.put(currentPath + Path.SEPARATOR + "file.txt", aData[1]);
+    }
+
+    // read a chunk to check content
+    try (SequenceFileIterator<Text, Text> iterator = new 
SequenceFileIterator<>(
+        fileStatuses[0].getPath(), true, configuration)){
+      while (iterator.hasNext()) {
+        Pair<Text, Text> record = iterator.next();
+        System.out.printf("MR-Recur > Trying to check: %s\n", 
record.getFirst().toString().trim());
+        String retrievedData = 
fileToData.get(record.getFirst().toString().trim());
+        assertNotNull(retrievedData);
+        assertEquals(retrievedData, record.getSecond().toString().trim());
+      }
+    }
+  }
+}

Reply via email to