http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/lucene/LuceneIterator.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/lucene/LuceneIterator.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/lucene/LuceneIterator.java new file mode 100644 index 0000000..6a8c659 --- /dev/null +++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/lucene/LuceneIterator.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.utils.vectors.lucene; + +import java.io.IOException; +import java.util.Set; +import java.util.TreeSet; + +import com.google.common.base.Preconditions; +import org.apache.lucene.index.IndexReader; +import org.apache.mahout.utils.vectors.TermInfo; +import org.apache.mahout.vectorizer.Weight; + +/** + * An {@link java.util.Iterator} over {@link org.apache.mahout.math.Vector}s that uses a Lucene index as the source + * for creating the {@link org.apache.mahout.math.Vector}s. The field used to create the vectors currently must have + * term vectors stored for it. + */ +public class LuceneIterator extends AbstractLuceneIterator { + + protected final Set<String> idFieldSelector; + protected final String idField; + + /** + * Produce a LuceneIterable that can create the Vector plus normalize it. + * + * @param indexReader {@link IndexReader} to read the documents from. + * @param idField field containing the id. May be null. + * @param field field to use for the Vector + * @param termInfo termInfo + * @param weight weight + * @param normPower the normalization value. Must be non-negative, or {@link LuceneIterable#NO_NORMALIZING} + */ + public LuceneIterator(IndexReader indexReader, String idField, String field, TermInfo termInfo, Weight weight, + double normPower) { + this(indexReader, idField, field, termInfo, weight, normPower, 0.0); + } + + /** + * @param indexReader {@link IndexReader} to read the documents from. + * @param idField field containing the id. May be null. + * @param field field to use for the Vector + * @param termInfo termInfo + * @param weight weight + * @param normPower the normalization value. Must be non-negative, or {@link LuceneIterable#NO_NORMALIZING} + * @param maxPercentErrorDocs most documents that will be tolerated without a term freq vector. In [0,1]. + * @see #LuceneIterator(org.apache.lucene.index.IndexReader, String, String, org.apache.mahout.utils.vectors.TermInfo, + * org.apache.mahout.vectorizer.Weight, double) + */ + public LuceneIterator(IndexReader indexReader, + String idField, + String field, + TermInfo termInfo, + Weight weight, + double normPower, + double maxPercentErrorDocs) { + super(termInfo, normPower, indexReader, weight, maxPercentErrorDocs, field); + // term docs(null) is a better way of iterating all the docs in Lucene + Preconditions.checkArgument(normPower == LuceneIterable.NO_NORMALIZING || normPower >= 0, + "normPower must be non-negative or -1, but normPower = " + normPower); + Preconditions.checkArgument(maxPercentErrorDocs >= 0.0 && maxPercentErrorDocs <= 1.0, + "Must be: 0.0 <= maxPercentErrorDocs <= 1.0"); + this.idField = idField; + if (idField != null) { + idFieldSelector = new TreeSet<>(); + idFieldSelector.add(idField); + } else { + /*The field in the index containing the index. If null, then the Lucene internal doc id is used + which is prone to error if the underlying index changes*/ + idFieldSelector = null; + } + } + + @Override + protected String getVectorName(int documentIndex) throws IOException { + String name; + if (idField != null) { + name = indexReader.document(documentIndex, idFieldSelector).get(idField); + } else { + name = String.valueOf(documentIndex); + } + return name; + } +}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/lucene/TFDFMapper.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/lucene/TFDFMapper.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/lucene/TFDFMapper.java new file mode 100644 index 0000000..5830ccc --- /dev/null +++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/lucene/TFDFMapper.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.utils.vectors.lucene; + +import org.apache.lucene.util.BytesRef; +import org.apache.mahout.math.RandomAccessSparseVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.utils.vectors.TermEntry; +import org.apache.mahout.utils.vectors.TermInfo; +import org.apache.mahout.vectorizer.Weight; + + +/** + * Not thread-safe + */ +public class TFDFMapper { + + private Vector vector; + + private final Weight weight; + private long numTerms; + private final TermInfo termInfo; + private String field; + private final int numDocs; + + public TFDFMapper(int numDocs, Weight weight, TermInfo termInfo) { + this.weight = weight; + this.termInfo = termInfo; + this.numDocs = numDocs; + } + + public void setExpectations(String field, long numTerms) { + this.field = field; + vector = new RandomAccessSparseVector(termInfo.totalTerms(field)); + this.numTerms = numTerms; + } + + public void map(BytesRef term, int frequency) { + TermEntry entry = termInfo.getTermEntry(field, term.utf8ToString()); + if (entry != null) { + vector.setQuick(entry.getTermIdx(), weight.calculate(frequency, entry.getDocFreq(), (int)numTerms, numDocs)); + } + } + + public Vector getVector() { + return this.vector; + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/lucene/TermInfoClusterInOut.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/lucene/TermInfoClusterInOut.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/lucene/TermInfoClusterInOut.java new file mode 100644 index 0000000..b0311c7 --- /dev/null +++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/lucene/TermInfoClusterInOut.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.utils.vectors.lucene; + +import org.apache.mahout.common.RandomUtils; + +class TermInfoClusterInOut implements Comparable<TermInfoClusterInOut> { + + private final String term; + private final int inClusterDF; + private final int outClusterDF; + private final double logLikelihoodRatio; + + TermInfoClusterInOut(String term, int inClusterDF, int outClusterDF, double logLikelihoodRatio) { + this.term = term; + this.inClusterDF = inClusterDF; + this.outClusterDF = outClusterDF; + this.logLikelihoodRatio = logLikelihoodRatio; + } + + @Override + public int hashCode() { + return term.hashCode() ^ inClusterDF ^ outClusterDF ^ RandomUtils.hashDouble(logLikelihoodRatio); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof TermInfoClusterInOut)) { + return false; + } + TermInfoClusterInOut other = (TermInfoClusterInOut) o; + return term.equals(other.getTerm()) + && inClusterDF == other.getInClusterDF() + && outClusterDF == other.getOutClusterDF() + && logLikelihoodRatio == other.getLogLikelihoodRatio(); + } + + @Override + public int compareTo(TermInfoClusterInOut that) { + int res = Double.compare(that.logLikelihoodRatio, logLikelihoodRatio); + if (res == 0) { + res = term.compareTo(that.term); + } + return res; + } + + public int getInClusterDiff() { + return this.inClusterDF - this.outClusterDF; + } + + String getTerm() { + return term; + } + + int getInClusterDF() { + return inClusterDF; + } + + int getOutClusterDF() { + return outClusterDF; + } + + double getLogLikelihoodRatio() { + return logLikelihoodRatio; + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/test/java/org/apache/mahout/cf/taste/impl/similarity/jdbc/MySQLJDBCInMemoryItemSimilarityTest.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/test/java/org/apache/mahout/cf/taste/impl/similarity/jdbc/MySQLJDBCInMemoryItemSimilarityTest.java b/community/mahout-mr/integration/src/test/java/org/apache/mahout/cf/taste/impl/similarity/jdbc/MySQLJDBCInMemoryItemSimilarityTest.java new file mode 100644 index 0000000..463a45f --- /dev/null +++ b/community/mahout-mr/integration/src/test/java/org/apache/mahout/cf/taste/impl/similarity/jdbc/MySQLJDBCInMemoryItemSimilarityTest.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.cf.taste.impl.similarity.jdbc; + +import org.apache.mahout.cf.taste.impl.TasteTestCase; +import org.apache.mahout.cf.taste.similarity.ItemSimilarity; +import org.easymock.EasyMock; +import org.junit.Test; + +import javax.sql.DataSource; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; + +public class MySQLJDBCInMemoryItemSimilarityTest extends TasteTestCase { + + @Test + public void testMemoryLoad() throws Exception { + + DataSource dataSource = EasyMock.createMock(DataSource.class); + Connection connection = EasyMock.createMock(Connection.class); + PreparedStatement statement = EasyMock.createMock(PreparedStatement.class); + ResultSet resultSet = EasyMock.createMock(ResultSet.class); + + EasyMock.expect(dataSource.getConnection()).andReturn(connection); + EasyMock.expect(connection.prepareStatement(MySQLJDBCInMemoryItemSimilarity.DEFAULT_GET_ALL_ITEMSIMILARITIES_SQL, + ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)).andReturn(statement); + statement.setFetchDirection(ResultSet.FETCH_FORWARD); + EasyMock.expect(statement.executeQuery()).andReturn(resultSet); + + EasyMock.expect(resultSet.next()).andReturn(true); + + EasyMock.expect(resultSet.getLong(1)).andReturn(1L); + EasyMock.expect(resultSet.getLong(2)).andReturn(2L); + EasyMock.expect(resultSet.getDouble(3)).andReturn(0.5); + EasyMock.expect(resultSet.next()).andReturn(true); + + EasyMock.expect(resultSet.getLong(1)).andReturn(1L); + EasyMock.expect(resultSet.getLong(2)).andReturn(3L); + EasyMock.expect(resultSet.getDouble(3)).andReturn(0.4); + EasyMock.expect(resultSet.next()).andReturn(true); + + EasyMock.expect(resultSet.getLong(1)).andReturn(3L); + EasyMock.expect(resultSet.getLong(2)).andReturn(4L); + EasyMock.expect(resultSet.getDouble(3)).andReturn(0.1); + + EasyMock.expect(resultSet.next()).andReturn(false); + + resultSet.close(); + statement.close(); + connection.close(); + + EasyMock.replay(dataSource, connection, statement, resultSet); + + ItemSimilarity similarity = new MySQLJDBCInMemoryItemSimilarity(dataSource); + + assertEquals(0.5, similarity.itemSimilarity(1L, 2L), EPSILON); + assertEquals(0.4, similarity.itemSimilarity(1L, 3L), EPSILON); + assertEquals(0.1, similarity.itemSimilarity(3L, 4L), EPSILON); + assertTrue(Double.isNaN(similarity.itemSimilarity(1L, 4L))); + + EasyMock.verify(dataSource, connection, statement, resultSet); + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/test/java/org/apache/mahout/clustering/TestClusterDumper.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/test/java/org/apache/mahout/clustering/TestClusterDumper.java b/community/mahout-mr/integration/src/test/java/org/apache/mahout/clustering/TestClusterDumper.java new file mode 100644 index 0000000..01d46fc --- /dev/null +++ b/community/mahout-mr/integration/src/test/java/org/apache/mahout/clustering/TestClusterDumper.java @@ -0,0 +1,236 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.lucene.analysis.standard.StandardAnalyzer; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.FieldType; +import org.apache.lucene.document.StringField; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexOptions; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.store.RAMDirectory; +import org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver; +import org.apache.mahout.clustering.kmeans.KMeansDriver; +import org.apache.mahout.clustering.kmeans.RandomSeedGenerator; +import org.apache.mahout.common.MahoutTestCase; +import org.apache.mahout.common.distance.DistanceMeasure; +import org.apache.mahout.common.distance.EuclideanDistanceMeasure; +import org.apache.mahout.math.NamedVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; +import org.apache.mahout.utils.clustering.ClusterDumper; +import org.apache.mahout.utils.vectors.TermEntry; +import org.apache.mahout.utils.vectors.TermInfo; +import org.apache.mahout.utils.vectors.lucene.CachedTermInfo; +import org.apache.mahout.utils.vectors.lucene.LuceneIterable; +import org.apache.mahout.vectorizer.TFIDF; +import org.apache.mahout.vectorizer.Weight; +import org.junit.Before; +import org.junit.Test; + +public final class TestClusterDumper extends MahoutTestCase { + + private static final String[] DOCS = { + "The quick red fox jumped over the lazy brown dogs.", + "The quick brown fox jumped over the lazy red dogs.", + "The quick red cat jumped over the lazy brown dogs.", + "The quick brown cat jumped over the lazy red dogs.", + "Mary had a little lamb whose fleece was white as snow.", + "Mary had a little goat whose fleece was white as snow.", + "Mary had a little lamb whose fleece was black as tar.", + "Dick had a little goat whose fleece was white as snow.", + "Moby Dick is a story of a whale and a man obsessed.", + "Moby Bob is a story of a walrus and a man obsessed.", + "Moby Dick is a story of a whale and a crazy man.", + "The robber wore a black fleece jacket and a baseball cap.", + "The robber wore a red fleece jacket and a baseball cap.", + "The robber wore a white fleece jacket and a baseball cap.", + "The English Springer Spaniel is the best of all dogs."}; + + private List<VectorWritable> sampleData; + + private String[] termDictionary; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + Configuration conf = getConfiguration(); + FileSystem fs = FileSystem.get(conf); + // Create test data + getSampleData(DOCS); + ClusteringTestUtils.writePointsToFile(sampleData, true, + getTestTempFilePath("testdata/file1"), fs, conf); + } + + private void getSampleData(String[] docs2) throws IOException { + sampleData = new ArrayList<>(); + RAMDirectory directory = new RAMDirectory(); + try (IndexWriter writer = new IndexWriter(directory, + new IndexWriterConfig(new StandardAnalyzer()))){ + for (int i = 0; i < docs2.length; i++) { + Document doc = new Document(); + Field id = new StringField("id", "doc_" + i, Field.Store.YES); + doc.add(id); + // Store both position and offset information + FieldType fieldType = new FieldType(); + fieldType.setStored(false); + fieldType.setIndexOptions(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS); + fieldType.setTokenized(true); + fieldType.setStoreTermVectors(true); + fieldType.setStoreTermVectorPositions(true); + fieldType.setStoreTermVectorOffsets(true); + fieldType.freeze(); + Field text = new Field("content", docs2[i], fieldType); + doc.add(text); + writer.addDocument(doc); + } + } + + IndexReader reader = DirectoryReader.open(directory); + + Weight weight = new TFIDF(); + TermInfo termInfo = new CachedTermInfo(reader, "content", 1, 100); + + int numTerms = 0; + for (Iterator<TermEntry> it = termInfo.getAllEntries(); it.hasNext();) { + it.next(); + numTerms++; + } + termDictionary = new String[numTerms]; + int i = 0; + for (Iterator<TermEntry> it = termInfo.getAllEntries(); it.hasNext();) { + String term = it.next().getTerm(); + termDictionary[i] = term; + System.out.println(i + " " + term); + i++; + } + Iterable<Vector> iterable = new LuceneIterable(reader, "id", "content", + termInfo,weight); + + i = 0; + for (Vector vector : iterable) { + assertNotNull(vector); + NamedVector namedVector; + if (vector instanceof NamedVector) { + // rename it for testing purposes + namedVector = new NamedVector(((NamedVector) vector).getDelegate(), + "P(" + i + ')'); + + } else { + namedVector = new NamedVector(vector, "P(" + i + ')'); + } + System.out.println(AbstractCluster.formatVector(namedVector, + termDictionary)); + sampleData.add(new VectorWritable(namedVector)); + i++; + } + } + + /** + * Return the path to the final iteration's clusters + */ + private static Path finalClusterPath(Configuration conf, Path output, + int maxIterations) throws IOException { + FileSystem fs = FileSystem.get(conf); + for (int i = maxIterations; i >= 0; i--) { + Path clusters = new Path(output, "clusters-" + i + "-final"); + if (fs.exists(clusters)) { + return clusters; + } + } + return null; + } + + @Test + public void testKmeans() throws Exception { + DistanceMeasure measure = new EuclideanDistanceMeasure(); + Path input = getTestTempFilePath("input"); + Path output = getTestTempDirPath("output"); + Path initialPoints = new Path(output, Cluster.CLUSTERS_DIR + '0' + Cluster.FINAL_ITERATION_SUFFIX); + Configuration conf = getConfiguration(); + FileSystem fs = FileSystem.get(conf); + // Write test data to file + ClusteringTestUtils.writePointsToFile(sampleData, input, fs, conf); + // Select initial centroids + RandomSeedGenerator.buildRandom(conf, input, initialPoints, 8, measure, 1L); + // Run k-means + Path kMeansOutput = new Path(output, "kmeans"); + KMeansDriver.run(conf, getTestTempDirPath("testdata"), initialPoints, kMeansOutput, 0.001, 10, true, 0.0, false); + // Print out clusters + ClusterDumper clusterDumper = new ClusterDumper(finalClusterPath(conf, + output, 10), new Path(kMeansOutput, "clusteredPoints")); + clusterDumper.printClusters(termDictionary); + } + + @Test + public void testJsonClusterDumper() throws Exception { + DistanceMeasure measure = new EuclideanDistanceMeasure(); + Path input = getTestTempFilePath("input"); + Path output = getTestTempDirPath("output"); + Path initialPoints = new Path(output, Cluster.CLUSTERS_DIR + '0' + Cluster.FINAL_ITERATION_SUFFIX); + Configuration conf = getConfiguration(); + FileSystem fs = FileSystem.get(conf); + // Write test data to file + ClusteringTestUtils.writePointsToFile(sampleData, input, fs, conf); + // Select initial centroids + RandomSeedGenerator.buildRandom(conf, input, initialPoints, 8, measure, 1L); + // Run k-means + Path kmeansOutput = new Path(output, "kmeans"); + KMeansDriver.run(conf, getTestTempDirPath("testdata"), initialPoints, kmeansOutput, 0.001, 10, true, 0.0, false); + // Print out clusters + ClusterDumper clusterDumper = new ClusterDumper(finalClusterPath(conf, + output, 10), new Path(kmeansOutput, "clusteredPoints")); + clusterDumper.setOutputFormat(ClusterDumper.OUTPUT_FORMAT.JSON); + clusterDumper.printClusters(termDictionary); + } + + @Test + public void testFuzzyKmeans() throws Exception { + DistanceMeasure measure = new EuclideanDistanceMeasure(); + Path input = getTestTempFilePath("input"); + Path output = getTestTempDirPath("output"); + Path initialPoints = new Path(output, Cluster.CLUSTERS_DIR + '0' + Cluster.FINAL_ITERATION_SUFFIX); + Configuration conf = getConfiguration(); + FileSystem fs = FileSystem.get(conf); + // Write test data to file + ClusteringTestUtils.writePointsToFile(sampleData, input, fs, conf); + // Select initial centroids + RandomSeedGenerator.buildRandom(conf, input, initialPoints, 8, measure, 1L); + // Run k-means + Path kMeansOutput = new Path(output, "kmeans"); + FuzzyKMeansDriver.run(conf, getTestTempDirPath("testdata"), initialPoints, kMeansOutput, 0.001, 10, 1.1f, true, + true, 0, true); + // run ClusterDumper + ClusterDumper clusterDumper = new ClusterDumper(finalClusterPath(conf, + output, 10), new Path(kMeansOutput, "clusteredPoints")); + clusterDumper.printClusters(termDictionary); + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/test/java/org/apache/mahout/clustering/TestClusterEvaluator.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/test/java/org/apache/mahout/clustering/TestClusterEvaluator.java b/community/mahout-mr/integration/src/test/java/org/apache/mahout/clustering/TestClusterEvaluator.java new file mode 100644 index 0000000..8a226a0 --- /dev/null +++ b/community/mahout-mr/integration/src/test/java/org/apache/mahout/clustering/TestClusterEvaluator.java @@ -0,0 +1,321 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.mahout.clustering.canopy.Canopy; +import org.apache.mahout.clustering.canopy.CanopyDriver; +import org.apache.mahout.clustering.evaluation.ClusterEvaluator; +import org.apache.mahout.clustering.evaluation.RepresentativePointsDriver; +import org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver; +import org.apache.mahout.clustering.kmeans.KMeansDriver; +import org.apache.mahout.clustering.kmeans.TestKmeansClustering; +import org.apache.mahout.common.HadoopUtil; +import org.apache.mahout.common.MahoutTestCase; +import org.apache.mahout.common.distance.DistanceMeasure; +import org.apache.mahout.common.distance.EuclideanDistanceMeasure; +import org.apache.mahout.math.DenseVector; +import org.apache.mahout.math.VectorWritable; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +public final class TestClusterEvaluator extends MahoutTestCase { + + private static final double[][] REFERENCE = { {1, 1}, {2, 1}, {1, 2}, {2, 2}, {3, 3}, {4, 4}, {5, 4}, {4, 5}, {5, 5}}; + + private List<VectorWritable> referenceData = Lists.newArrayList(); + + private final List<VectorWritable> sampleData = Lists.newArrayList(); + + private Map<Integer,List<VectorWritable>> representativePoints; + + private List<Cluster> clusters; + + private static final Logger log = LoggerFactory.getLogger(TestClusterEvaluator.class); + + private Configuration conf; + + private FileSystem fs; + + private Path testdata; + + private Path output; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + conf = getConfiguration(); + fs = FileSystem.get(conf); + testdata = getTestTempDirPath("testdata"); + output = getTestTempDirPath("output"); + // Create small reference data set + referenceData = TestKmeansClustering.getPointsWritable(REFERENCE); + // generate larger test data set for the clustering tests to chew on + generateSamples(); + } + + /** + * Generate random samples and add them to the sampleData + * + * @param num + * int number of samples to generate + * @param mx + * double x-value of the sample mean + * @param my + * double y-value of the sample mean + * @param sd + * double standard deviation of the samples + */ + private void generateSamples(int num, double mx, double my, double sd) { + log.info("Generating {} samples m=[{}, {}] sd={}", num, mx, my, sd); + for (int i = 0; i < num; i++) { + sampleData.add(new VectorWritable(new DenseVector(new double[] {UncommonDistributions.rNorm(mx, sd), + UncommonDistributions.rNorm(my, sd)}))); + } + } + + private void generateSamples() { + generateSamples(500, 1, 1, 3); + generateSamples(300, 1, 0, 0.5); + generateSamples(300, 0, 2, 0.1); + } + + private void printRepPoints(int numIterations) { + RepresentativePointsDriver.printRepresentativePoints(output, numIterations); + } + + /** + * Initialize synthetic data using 4 clusters dC units from origin having 4 representative points dP from each center + * + * @param dC + * a double cluster center offset + * @param dP + * a double representative point offset + * @param measure + * the DistanceMeasure + */ + private void initData(double dC, double dP, DistanceMeasure measure) { + clusters = Lists.newArrayList(); + clusters.add(new Canopy(new DenseVector(new double[] {-dC, -dC}), 1, measure)); + clusters.add(new Canopy(new DenseVector(new double[] {-dC, dC}), 3, measure)); + clusters.add(new Canopy(new DenseVector(new double[] {dC, dC}), 5, measure)); + clusters.add(new Canopy(new DenseVector(new double[] {dC, -dC}), 7, measure)); + representativePoints = Maps.newHashMap(); + for (Cluster cluster : clusters) { + List<VectorWritable> points = Lists.newArrayList(); + representativePoints.put(cluster.getId(), points); + points.add(new VectorWritable(cluster.getCenter().clone())); + points.add(new VectorWritable(cluster.getCenter().plus(new DenseVector(new double[] {dP, dP})))); + points.add(new VectorWritable(cluster.getCenter().plus(new DenseVector(new double[] {dP, -dP})))); + points.add(new VectorWritable(cluster.getCenter().plus(new DenseVector(new double[] {-dP, -dP})))); + points.add(new VectorWritable(cluster.getCenter().plus(new DenseVector(new double[] {-dP, dP})))); + } + } + + @Test + public void testRepresentativePoints() throws Exception { + ClusteringTestUtils.writePointsToFile(referenceData, new Path(testdata, "file1"), fs, conf); + DistanceMeasure measure = new EuclideanDistanceMeasure(); + Configuration conf = getConfiguration(); + // run using MR reference point calculation + CanopyDriver.run(conf, testdata, output, measure, 3.1, 1.1, true, 0.0, true); + int numIterations = 2; + Path clustersIn = new Path(output, "clusters-0-final"); + RepresentativePointsDriver.run(conf, clustersIn, new Path(output, "clusteredPoints"), output, measure, + numIterations, false); + printRepPoints(numIterations); + ClusterEvaluator evaluatorMR = new ClusterEvaluator(conf, clustersIn); + // now run again using sequential reference point calculation + HadoopUtil.delete(conf, output); + CanopyDriver.run(conf, testdata, output, measure, 3.1, 1.1, true, 0.0, true); + RepresentativePointsDriver.run(conf, clustersIn, new Path(output, "clusteredPoints"), output, measure, + numIterations, true); + printRepPoints(numIterations); + ClusterEvaluator evaluatorSeq = new ClusterEvaluator(conf, clustersIn); + // compare results + assertEquals("InterCluster Density", evaluatorMR.interClusterDensity(), evaluatorSeq.interClusterDensity(), EPSILON); + assertEquals("IntraCluster Density", evaluatorMR.intraClusterDensity(), evaluatorSeq.intraClusterDensity(), EPSILON); + } + + @Test + public void testCluster0() throws IOException { + ClusteringTestUtils.writePointsToFile(referenceData, new Path(testdata, "file1"), fs, conf); + DistanceMeasure measure = new EuclideanDistanceMeasure(); + initData(1, 0.25, measure); + ClusterEvaluator evaluator = new ClusterEvaluator(representativePoints, clusters, measure); + assertEquals("inter cluster density", 0.33333333333333315, evaluator.interClusterDensity(), EPSILON); + assertEquals("intra cluster density", 0.3656854249492381, evaluator.intraClusterDensity(), EPSILON); + } + + @Test + public void testCluster1() throws IOException { + ClusteringTestUtils.writePointsToFile(referenceData, new Path(testdata, "file1"), fs, conf); + DistanceMeasure measure = new EuclideanDistanceMeasure(); + initData(1, 0.5, measure); + ClusterEvaluator evaluator = new ClusterEvaluator(representativePoints, clusters, measure); + assertEquals("inter cluster density", 0.33333333333333315, evaluator.interClusterDensity(), EPSILON); + assertEquals("intra cluster density", 0.3656854249492381, evaluator.intraClusterDensity(), EPSILON); + } + + @Test + public void testCluster2() throws IOException { + ClusteringTestUtils.writePointsToFile(referenceData, new Path(testdata, "file1"), fs, conf); + DistanceMeasure measure = new EuclideanDistanceMeasure(); + initData(1, 0.75, measure); + ClusterEvaluator evaluator = new ClusterEvaluator(representativePoints, clusters, measure); + assertEquals("inter cluster density", 0.33333333333333315, evaluator.interClusterDensity(), EPSILON); + assertEquals("intra cluster density", 0.3656854249492381, evaluator.intraClusterDensity(), EPSILON); + } + + /** + * adding an empty cluster should modify the inter cluster density but not change the intra-cluster density as that + * cluster would have NaN as its intra-cluster density and NaN values are ignored by the evaluator + * + * @throws IOException + */ + @Test + public void testEmptyCluster() throws IOException { + ClusteringTestUtils.writePointsToFile(referenceData, new Path(testdata, "file1"), fs, conf); + DistanceMeasure measure = new EuclideanDistanceMeasure(); + initData(1, 0.25, measure); + Canopy cluster = new Canopy(new DenseVector(new double[] {10, 10}), 19, measure); + clusters.add(cluster); + List<VectorWritable> points = Lists.newArrayList(); + representativePoints.put(cluster.getId(), points); + ClusterEvaluator evaluator = new ClusterEvaluator(representativePoints, clusters, measure); + assertEquals("inter cluster density", 0.371534146934532, evaluator.interClusterDensity(), EPSILON); + assertEquals("intra cluster density", 0.3656854249492381, evaluator.intraClusterDensity(), EPSILON); + } + + /** + * adding an single-valued cluster should modify the inter cluster density but not change the intra-cluster density as + * that cluster would have NaN as its intra-cluster density and NaN values are ignored by the evaluator + * + * @throws IOException + */ + @Test + public void testSingleValueCluster() throws IOException { + ClusteringTestUtils.writePointsToFile(referenceData, new Path(testdata, "file1"), fs, conf); + DistanceMeasure measure = new EuclideanDistanceMeasure(); + initData(1, 0.25, measure); + Canopy cluster = new Canopy(new DenseVector(new double[] {0, 0}), 19, measure); + clusters.add(cluster); + List<VectorWritable> points = Lists.newArrayList(); + points.add(new VectorWritable(cluster.getCenter().plus(new DenseVector(new double[] {1, 1})))); + representativePoints.put(cluster.getId(), points); + ClusterEvaluator evaluator = new ClusterEvaluator(representativePoints, clusters, measure); + assertEquals("inter cluster density", 0.3656854249492381, evaluator.interClusterDensity(), EPSILON); + assertEquals("intra cluster density", 0.3656854249492381, evaluator.intraClusterDensity(), EPSILON); + } + + /** + * Representative points extraction will duplicate the cluster center if the cluster has no assigned points. These + * clusters are included in the inter-cluster density but their NaN intra-density values are ignored by the evaluator. + * + * @throws IOException + */ + @Test + public void testAllSameValueCluster() throws IOException { + ClusteringTestUtils.writePointsToFile(referenceData, new Path(testdata, "file1"), fs, conf); + DistanceMeasure measure = new EuclideanDistanceMeasure(); + initData(1, 0.25, measure); + Canopy cluster = new Canopy(new DenseVector(new double[] {0, 0}), 19, measure); + clusters.add(cluster); + List<VectorWritable> points = Lists.newArrayList(); + points.add(new VectorWritable(cluster.getCenter())); + points.add(new VectorWritable(cluster.getCenter())); + points.add(new VectorWritable(cluster.getCenter())); + representativePoints.put(cluster.getId(), points); + ClusterEvaluator evaluator = new ClusterEvaluator(representativePoints, clusters, measure); + assertEquals("inter cluster density", 0.3656854249492381, evaluator.interClusterDensity(), EPSILON); + assertEquals("intra cluster density", 0.3656854249492381, evaluator.intraClusterDensity(), EPSILON); + } + + @Test + public void testCanopy() throws Exception { + ClusteringTestUtils.writePointsToFile(sampleData, new Path(testdata, "file1"), fs, conf); + DistanceMeasure measure = new EuclideanDistanceMeasure(); + Configuration conf = getConfiguration(); + CanopyDriver.run(conf, testdata, output, measure, 3.1, 1.1, true, 0.0, true); + int numIterations = 10; + Path clustersIn = new Path(output, "clusters-0-final"); + RepresentativePointsDriver.run(conf, clustersIn, new Path(output, "clusteredPoints"), output, measure, + numIterations, true); + //printRepPoints(numIterations); + ClusterEvaluator evaluator = new ClusterEvaluator(conf, clustersIn); + // now print out the Results + System.out.println("Intra-cluster density = " + evaluator.intraClusterDensity()); + System.out.println("Inter-cluster density = " + evaluator.interClusterDensity()); + } + + @Test + public void testKmeans() throws Exception { + ClusteringTestUtils.writePointsToFile(sampleData, new Path(testdata, "file1"), fs, conf); + DistanceMeasure measure = new EuclideanDistanceMeasure(); + // now run the Canopy job to prime kMeans canopies + Configuration conf = getConfiguration(); + CanopyDriver.run(conf, testdata, output, measure, 3.1, 1.1, false, 0.0, true); + // now run the KMeans job + Path kmeansOutput = new Path(output, "kmeans"); + KMeansDriver.run(testdata, new Path(output, "clusters-0-final"), kmeansOutput, 0.001, 10, true, 0.0, true); + int numIterations = 10; + Path clustersIn = new Path(kmeansOutput, "clusters-2"); + RepresentativePointsDriver.run(conf, clustersIn, new Path(kmeansOutput, "clusteredPoints"), kmeansOutput, measure, + numIterations, true); + RepresentativePointsDriver.printRepresentativePoints(kmeansOutput, numIterations); + ClusterEvaluator evaluator = new ClusterEvaluator(conf, clustersIn); + // now print out the Results + System.out.println("Intra-cluster density = " + evaluator.intraClusterDensity()); + System.out.println("Inter-cluster density = " + evaluator.interClusterDensity()); + } + + @Test + public void testFuzzyKmeans() throws Exception { + ClusteringTestUtils.writePointsToFile(sampleData, new Path(testdata, "file1"), fs, conf); + DistanceMeasure measure = new EuclideanDistanceMeasure(); + // now run the Canopy job to prime kMeans canopies + Configuration conf = getConfiguration(); + CanopyDriver.run(conf, testdata, output, measure, 3.1, 1.1, false, 0.0, true); + Path fuzzyKMeansOutput = new Path(output, "fuzzyk"); + // now run the KMeans job + FuzzyKMeansDriver.run(testdata, new Path(output, "clusters-0-final"), fuzzyKMeansOutput, 0.001, 10, 2, + true, true, 0, true); + int numIterations = 10; + Path clustersIn = new Path(fuzzyKMeansOutput, "clusters-4"); + RepresentativePointsDriver.run(conf, clustersIn, new Path(fuzzyKMeansOutput, "clusteredPoints"), fuzzyKMeansOutput, + measure, numIterations, true); + RepresentativePointsDriver.printRepresentativePoints(fuzzyKMeansOutput, numIterations); + ClusterEvaluator evaluator = new ClusterEvaluator(conf, clustersIn); + // now print out the Results + System.out.println("Intra-cluster density = " + evaluator.intraClusterDensity()); + System.out.println("Inter-cluster density = " + evaluator.interClusterDensity()); + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/test/java/org/apache/mahout/clustering/cdbw/TestCDbwEvaluator.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/test/java/org/apache/mahout/clustering/cdbw/TestCDbwEvaluator.java b/community/mahout-mr/integration/src/test/java/org/apache/mahout/clustering/cdbw/TestCDbwEvaluator.java new file mode 100644 index 0000000..597ed01 --- /dev/null +++ b/community/mahout-mr/integration/src/test/java/org/apache/mahout/clustering/cdbw/TestCDbwEvaluator.java @@ -0,0 +1,326 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.cdbw; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.mahout.clustering.Cluster; +import org.apache.mahout.clustering.ClusteringTestUtils; +import org.apache.mahout.clustering.TestClusterEvaluator; +import org.apache.mahout.clustering.UncommonDistributions; +import org.apache.mahout.clustering.canopy.Canopy; +import org.apache.mahout.clustering.canopy.CanopyDriver; +import org.apache.mahout.clustering.evaluation.RepresentativePointsDriver; +import org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver; +import org.apache.mahout.clustering.kmeans.KMeansDriver; +import org.apache.mahout.clustering.kmeans.TestKmeansClustering; +import org.apache.mahout.common.MahoutTestCase; +import org.apache.mahout.common.distance.DistanceMeasure; +import org.apache.mahout.common.distance.EuclideanDistanceMeasure; +import org.apache.mahout.math.DenseVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class TestCDbwEvaluator extends MahoutTestCase { + + private static final double[][] REFERENCE = { {1, 1}, {2, 1}, {1, 2}, {2, 2}, {3, 3}, {4, 4}, {5, 4}, {4, 5}, {5, 5}}; + + private static final Logger log = LoggerFactory.getLogger(TestClusterEvaluator.class); + + private Map<Integer,List<VectorWritable>> representativePoints; + + private List<Cluster> clusters; + + private Configuration conf; + + private FileSystem fs; + + private final Collection<VectorWritable> sampleData = new ArrayList<>(); + + private List<VectorWritable> referenceData = new ArrayList<>(); + + private Path testdata; + + private Path output; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + conf = getConfiguration(); + fs = FileSystem.get(conf); + testdata = getTestTempDirPath("testdata"); + output = getTestTempDirPath("output"); + // Create small reference data set + referenceData = TestKmeansClustering.getPointsWritable(REFERENCE); + // generate larger test data set for the clustering tests to chew on + generateSamples(); + } + + /** + * Initialize synthetic data using 4 clusters dC units from origin having 4 representative points dP from each center + * + * @param dC + * a double cluster center offset + * @param dP + * a double representative point offset + * @param measure + * the DistanceMeasure + */ + private void initData(double dC, double dP, DistanceMeasure measure) { + clusters = new ArrayList<>(); + clusters.add(new Canopy(new DenseVector(new double[] {-dC, -dC}), 1, measure)); + clusters.add(new Canopy(new DenseVector(new double[] {-dC, dC}), 3, measure)); + clusters.add(new Canopy(new DenseVector(new double[] {dC, dC}), 5, measure)); + clusters.add(new Canopy(new DenseVector(new double[] {dC, -dC}), 7, measure)); + representativePoints = new HashMap<>(); + for (Cluster cluster : clusters) { + List<VectorWritable> points = new ArrayList<>(); + representativePoints.put(cluster.getId(), points); + points.add(new VectorWritable(cluster.getCenter().clone())); + points.add(new VectorWritable(cluster.getCenter().plus(new DenseVector(new double[] {dP, dP})))); + points.add(new VectorWritable(cluster.getCenter().plus(new DenseVector(new double[] {dP, -dP})))); + points.add(new VectorWritable(cluster.getCenter().plus(new DenseVector(new double[] {-dP, -dP})))); + points.add(new VectorWritable(cluster.getCenter().plus(new DenseVector(new double[] {-dP, dP})))); + } + } + + /** + * Generate random samples and add them to the sampleData + * + * @param num + * int number of samples to generate + * @param mx + * double x-value of the sample mean + * @param my + * double y-value of the sample mean + * @param sd + * double standard deviation of the samples + */ + private void generateSamples(int num, double mx, double my, double sd) { + log.info("Generating {} samples m=[{}, {}] sd={}", num, mx, my, sd); + for (int i = 0; i < num; i++) { + sampleData.add(new VectorWritable(new DenseVector(new double[] {UncommonDistributions.rNorm(mx, sd), + UncommonDistributions.rNorm(my, sd)}))); + } + } + + private void generateSamples() { + generateSamples(500, 1, 1, 3); + generateSamples(300, 1, 0, 0.5); + generateSamples(300, 0, 2, 0.1); + } + + @Test + public void testCDbw0() throws IOException { + ClusteringTestUtils.writePointsToFile(referenceData, getTestTempFilePath("testdata/file1"), fs, conf); + DistanceMeasure measure = new EuclideanDistanceMeasure(); + initData(1, 0.25, measure); + CDbwEvaluator evaluator = new CDbwEvaluator(representativePoints, clusters, measure); + System.out.println("CDbw = " + evaluator.getCDbw()); + System.out.println("Intra-cluster density = " + evaluator.intraClusterDensity()); + System.out.println("Inter-cluster density = " + evaluator.interClusterDensity()); + System.out.println("Separation = " + evaluator.separation()); + } + + @Test + public void testCDbw1() throws IOException { + ClusteringTestUtils.writePointsToFile(referenceData, getTestTempFilePath("testdata/file1"), fs, conf); + DistanceMeasure measure = new EuclideanDistanceMeasure(); + initData(1, 0.5, measure); + CDbwEvaluator evaluator = new CDbwEvaluator(representativePoints, clusters, measure); + System.out.println("CDbw = " + evaluator.getCDbw()); + System.out.println("Intra-cluster density = " + evaluator.intraClusterDensity()); + System.out.println("Inter-cluster density = " + evaluator.interClusterDensity()); + System.out.println("Separation = " + evaluator.separation()); + } + + @Test + public void testCDbw2() throws IOException { + ClusteringTestUtils.writePointsToFile(referenceData, getTestTempFilePath("testdata/file1"), fs, conf); + DistanceMeasure measure = new EuclideanDistanceMeasure(); + initData(1, 0.75, measure); + CDbwEvaluator evaluator = new CDbwEvaluator(representativePoints, clusters, measure); + System.out.println("CDbw = " + evaluator.getCDbw()); + System.out.println("Intra-cluster density = " + evaluator.intraClusterDensity()); + System.out.println("Inter-cluster density = " + evaluator.interClusterDensity()); + System.out.println("Separation = " + evaluator.separation()); + } + + @Test + public void testEmptyCluster() throws IOException { + ClusteringTestUtils.writePointsToFile(referenceData, getTestTempFilePath("testdata/file1"), fs, conf); + DistanceMeasure measure = new EuclideanDistanceMeasure(); + initData(1, 0.25, measure); + Canopy cluster = new Canopy(new DenseVector(new double[] {10, 10}), 19, measure); + clusters.add(cluster); + List<VectorWritable> points = new ArrayList<>(); + representativePoints.put(cluster.getId(), points); + CDbwEvaluator evaluator = new CDbwEvaluator(representativePoints, clusters, measure); + System.out.println("CDbw = " + evaluator.getCDbw()); + System.out.println("Intra-cluster density = " + evaluator.intraClusterDensity()); + System.out.println("Inter-cluster density = " + evaluator.interClusterDensity()); + System.out.println("Separation = " + evaluator.separation()); + } + + @Test + public void testSingleValueCluster() throws IOException { + ClusteringTestUtils.writePointsToFile(referenceData, getTestTempFilePath("testdata/file1"), fs, conf); + DistanceMeasure measure = new EuclideanDistanceMeasure(); + initData(1, 0.25, measure); + Canopy cluster = new Canopy(new DenseVector(new double[] {0, 0}), 19, measure); + clusters.add(cluster); + List<VectorWritable> points = new ArrayList<>(); + points.add(new VectorWritable(cluster.getCenter().plus(new DenseVector(new double[] {1, 1})))); + representativePoints.put(cluster.getId(), points); + CDbwEvaluator evaluator = new CDbwEvaluator(representativePoints, clusters, measure); + System.out.println("CDbw = " + evaluator.getCDbw()); + System.out.println("Intra-cluster density = " + evaluator.intraClusterDensity()); + System.out.println("Inter-cluster density = " + evaluator.interClusterDensity()); + System.out.println("Separation = " + evaluator.separation()); + } + + /** + * Representative points extraction will duplicate the cluster center if the cluster has no assigned points. These + * clusters should be ignored like empty clusters above + * + * @throws IOException + */ + @Test + public void testAllSameValueCluster() throws IOException { + ClusteringTestUtils.writePointsToFile(referenceData, getTestTempFilePath("testdata/file1"), fs, conf); + DistanceMeasure measure = new EuclideanDistanceMeasure(); + initData(1, 0.25, measure); + Canopy cluster = new Canopy(new DenseVector(new double[] {0, 0}), 19, measure); + clusters.add(cluster); + List<VectorWritable> points = new ArrayList<>(); + points.add(new VectorWritable(cluster.getCenter())); + points.add(new VectorWritable(cluster.getCenter())); + points.add(new VectorWritable(cluster.getCenter())); + representativePoints.put(cluster.getId(), points); + CDbwEvaluator evaluator = new CDbwEvaluator(representativePoints, clusters, measure); + System.out.println("CDbw = " + evaluator.getCDbw()); + System.out.println("Intra-cluster density = " + evaluator.intraClusterDensity()); + System.out.println("Inter-cluster density = " + evaluator.interClusterDensity()); + System.out.println("Separation = " + evaluator.separation()); + } + + /** + * Clustering can produce very, very tight clusters that can cause the std calculation to fail. These clusters should + * be processed correctly. + * + * @throws IOException + */ + @Test + public void testAlmostSameValueCluster() throws IOException { + ClusteringTestUtils.writePointsToFile(referenceData, getTestTempFilePath("testdata/file1"), fs, conf); + DistanceMeasure measure = new EuclideanDistanceMeasure(); + initData(1, 0.25, measure); + Canopy cluster = new Canopy(new DenseVector(new double[] {0, 0}), 19, measure); + clusters.add(cluster); + List<VectorWritable> points = new ArrayList<>(); + Vector delta = new DenseVector(new double[] {0, Double.MIN_NORMAL}); + points.add(new VectorWritable(delta.clone())); + points.add(new VectorWritable(delta.clone())); + points.add(new VectorWritable(delta.clone())); + points.add(new VectorWritable(delta.clone())); + points.add(new VectorWritable(delta.clone())); + representativePoints.put(cluster.getId(), points); + CDbwEvaluator evaluator = new CDbwEvaluator(representativePoints, clusters, measure); + System.out.println("CDbw = " + evaluator.getCDbw()); + System.out.println("Intra-cluster density = " + evaluator.intraClusterDensity()); + System.out.println("Inter-cluster density = " + evaluator.interClusterDensity()); + System.out.println("Separation = " + evaluator.separation()); + } + + @Test + public void testCanopy() throws Exception { + ClusteringTestUtils.writePointsToFile(sampleData, getTestTempFilePath("testdata/file1"), fs, conf); + DistanceMeasure measure = new EuclideanDistanceMeasure(); + CanopyDriver.run(getConfiguration(), testdata, output, measure, 3.1, 2.1, true, 0.0, true); + int numIterations = 10; + Path clustersIn = new Path(output, "clusters-0-final"); + RepresentativePointsDriver.run(conf, clustersIn, new Path(output, "clusteredPoints"), output, measure, + numIterations, true); + CDbwEvaluator evaluator = new CDbwEvaluator(conf, clustersIn); + // printRepPoints(numIterations); + // now print out the Results + System.out.println("Canopy CDbw = " + evaluator.getCDbw()); + System.out.println("Intra-cluster density = " + evaluator.intraClusterDensity()); + System.out.println("Inter-cluster density = " + evaluator.interClusterDensity()); + System.out.println("Separation = " + evaluator.separation()); + } + + @Test + public void testKmeans() throws Exception { + ClusteringTestUtils.writePointsToFile(sampleData, getTestTempFilePath("testdata/file1"), fs, conf); + DistanceMeasure measure = new EuclideanDistanceMeasure(); + // now run the Canopy job to prime kMeans canopies + CanopyDriver.run(getConfiguration(), testdata, output, measure, 3.1, 2.1, false, 0.0, true); + // now run the KMeans job + Path kmeansOutput = new Path(output, "kmeans"); + KMeansDriver.run(testdata, new Path(output, "clusters-0-final"), kmeansOutput, 0.001, 10, true, 0.0, true); + int numIterations = 10; + Path clustersIn = new Path(kmeansOutput, "clusters-10-final"); + RepresentativePointsDriver.run(conf, clustersIn, new Path(kmeansOutput, "clusteredPoints"), kmeansOutput, measure, + numIterations, true); + CDbwEvaluator evaluator = new CDbwEvaluator(conf, clustersIn); + RepresentativePointsDriver.printRepresentativePoints(kmeansOutput, numIterations); + // now print out the Results + System.out.println("K-Means CDbw = " + evaluator.getCDbw()); + System.out.println("Intra-cluster density = " + evaluator.intraClusterDensity()); + System.out.println("Inter-cluster density = " + evaluator.interClusterDensity()); + System.out.println("Separation = " + evaluator.separation()); + } + + @Test + public void testFuzzyKmeans() throws Exception { + ClusteringTestUtils.writePointsToFile(sampleData, getTestTempFilePath("testdata/file1"), fs, conf); + DistanceMeasure measure = new EuclideanDistanceMeasure(); + // now run the Canopy job to prime kMeans canopies + CanopyDriver.run(getConfiguration(), testdata, output, measure, 3.1, 2.1, false, 0.0, true); + Path fuzzyKMeansOutput = new Path(output, "fuzzyk"); + // now run the KMeans job + FuzzyKMeansDriver.run(testdata, new Path(output, "clusters-0-final"), fuzzyKMeansOutput, 0.001, 10, 2, + true, true, 0, true); + int numIterations = 10; + Path clustersIn = new Path(fuzzyKMeansOutput, "clusters-4"); + RepresentativePointsDriver.run(conf, clustersIn, new Path(fuzzyKMeansOutput, "clusteredPoints"), fuzzyKMeansOutput, + measure, numIterations, true); + CDbwEvaluator evaluator = new CDbwEvaluator(conf, clustersIn); + RepresentativePointsDriver.printRepresentativePoints(fuzzyKMeansOutput, numIterations); + // now print out the Results + System.out.println("Fuzzy K-Means CDbw = " + evaluator.getCDbw()); + System.out.println("Intra-cluster density = " + evaluator.intraClusterDensity()); + System.out.println("Inter-cluster density = " + evaluator.interClusterDensity()); + System.out.println("Separation = " + evaluator.separation()); + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/test/java/org/apache/mahout/text/MailArchivesClusteringAnalyzerTest.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/test/java/org/apache/mahout/text/MailArchivesClusteringAnalyzerTest.java b/community/mahout-mr/integration/src/test/java/org/apache/mahout/text/MailArchivesClusteringAnalyzerTest.java new file mode 100644 index 0000000..ba73c82 --- /dev/null +++ b/community/mahout-mr/integration/src/test/java/org/apache/mahout/text/MailArchivesClusteringAnalyzerTest.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.text; + +import java.io.Reader; +import java.io.StringReader; + +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.analysis.TokenStream; +import org.apache.lucene.analysis.tokenattributes.CharTermAttribute; +import org.apache.mahout.common.MahoutTestCase; +import org.junit.Test; + +/** + * Unit tests for the MailArchivesClusteringAnalyzer text analyzer. + */ +public class MailArchivesClusteringAnalyzerTest extends MahoutTestCase { + + @Test + public void testAnalysis() throws Exception { + Analyzer analyzer = new MailArchivesClusteringAnalyzer(); + + String text = "A test message\n" + + "atokenthatistoolongtobeusefulforclustertextanalysis\n" + + "Mahout is a scalable, machine-learning LIBRARY\n" + + "we've added some additional stopwords such as html, mailto, regards\t" + + "apache_hadoop provides the foundation for scalability\n" + + "www.nabble.com [email protected]\n" + + "public void int protected package"; + Reader reader = new StringReader(text); + + // if you change the text above, then you may need to change this as well + // order matters too + String[] expectedTokens = { + "test", "mahout", "scalabl", "machin", "learn", "librari", "weve", "ad", + "stopword", "apache_hadoop","provid", "foundat", "scalabl" + }; + + TokenStream tokenStream = analyzer.tokenStream("test", reader); + assertNotNull(tokenStream); + tokenStream.reset(); + CharTermAttribute termAtt = tokenStream.addAttribute(CharTermAttribute.class); + int e = 0; + while (tokenStream.incrementToken() && e < expectedTokens.length) { + assertEquals(expectedTokens[e++], termAtt.toString()); + } + assertEquals(e, expectedTokens.length); + tokenStream.end(); + tokenStream.close(); + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromMailArchivesTest.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromMailArchivesTest.java b/community/mahout-mr/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromMailArchivesTest.java new file mode 100644 index 0000000..ef2b8a6 --- /dev/null +++ b/community/mahout-mr/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromMailArchivesTest.java @@ -0,0 +1,240 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.mahout.text; + +import java.io.File; +import java.io.FileOutputStream; +import java.util.zip.GZIPOutputStream; + +import org.apache.commons.lang3.SystemUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.mahout.common.MahoutTestCase; +import org.apache.mahout.common.Pair; +import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterator; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Test case for the SequenceFilesFromMailArchives command-line application. + */ +public final class SequenceFilesFromMailArchivesTest extends MahoutTestCase { + + private File inputDir; + + /** + * Create the input and output directories needed for testing + * the SequenceFilesFromMailArchives application. + */ + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + inputDir = getTestTempDir("mail-archives-in"); + + // write test mail messages to a gzipped file in a nested directory + File subDir = new File(inputDir, "subdir"); + subDir.mkdir(); + File gzFile = new File(subDir, "mail-messages.gz"); + try (GZIPOutputStream gzOut = new GZIPOutputStream(new FileOutputStream(gzFile))) { + gzOut.write(testMailMessages.getBytes("UTF-8")); + gzOut.finish(); + } + + File subDir2 = new File(subDir, "subsubdir"); + subDir2.mkdir(); + File gzFile2 = new File(subDir2, "mail-messages-2.gz"); + try (GZIPOutputStream gzOut = new GZIPOutputStream(new FileOutputStream(gzFile2))) { + gzOut.write(testMailMessages.getBytes("UTF-8")); + gzOut.finish(); + } + } + + @Test + public void testSequential() throws Exception { + + File outputDir = this.getTestTempDir("mail-archives-out"); + + String[] args = { + "--input", inputDir.getAbsolutePath(), + "--output", outputDir.getAbsolutePath(), + "--charset", "UTF-8", + "--keyPrefix", "TEST", + "--method", "sequential", + "--body", "--subject", "--separator", "" + }; + + // run the application's main method + SequenceFilesFromMailArchives.main(args); + + // app should create a single SequenceFile named "chunk-0" in the output dir + File expectedChunkFile = new File(outputDir, "chunk-0"); + String expectedChunkPath = expectedChunkFile.getAbsolutePath(); + Assert.assertTrue("Expected chunk file " + expectedChunkPath + " not found!", expectedChunkFile.isFile()); + + Configuration conf = getConfiguration(); + SequenceFileIterator<Text, Text> iterator = new SequenceFileIterator<>(new Path(expectedChunkPath), true, conf); + Assert.assertTrue("First key/value pair not found!", iterator.hasNext()); + Pair<Text, Text> record = iterator.next(); + + File parentFile = new File(new File(new File("TEST"), "subdir"), "mail-messages.gz"); + Assert.assertEquals(new File(parentFile, testVars[0][0]).toString(), record.getFirst().toString()); + Assert.assertEquals(testVars[0][1] + testVars[0][2], record.getSecond().toString()); + + Assert.assertTrue("Second key/value pair not found!", iterator.hasNext()); + + record = iterator.next(); + Assert.assertEquals(new File(parentFile, testVars[1][0]).toString(), record.getFirst().toString()); + Assert.assertEquals(testVars[1][1] + testVars[1][2], record.getSecond().toString()); + + record = iterator.next(); + File parentFileSubSubDir = new File(new File(new File(new File("TEST"), "subdir"), "subsubdir"), "mail-messages-2.gz"); + Assert.assertEquals(new File(parentFileSubSubDir, testVars[0][0]).toString(), record.getFirst().toString()); + Assert.assertEquals(testVars[0][1] + testVars[0][2], record.getSecond().toString()); + + Assert.assertTrue("Second key/value pair not found!", iterator.hasNext()); + record = iterator.next(); + Assert.assertEquals(new File(parentFileSubSubDir, testVars[1][0]).toString(), record.getFirst().toString()); + Assert.assertEquals(testVars[1][1] + testVars[1][2], record.getSecond().toString()); + + Assert.assertFalse("Only two key/value pairs expected!", iterator.hasNext()); + } + + @Test + public void testMapReduce() throws Exception { + + Path tmpDir = getTestTempDirPath(); + Path mrOutputDir = new Path(tmpDir, "mail-archives-out-mr"); + Configuration configuration = getConfiguration(); + FileSystem fs = FileSystem.get(configuration); + + File expectedInputFile = new File(inputDir.toString()); + + String[] args = { + "-Dhadoop.tmp.dir=" + configuration.get("hadoop.tmp.dir"), + "--input", expectedInputFile.getAbsolutePath(), + "--output", mrOutputDir.toString(), + "--charset", "UTF-8", + "--keyPrefix", "TEST", + "--method", "mapreduce", + "--body", "--subject", "--separator", "" + }; + + // run the application's main method + SequenceFilesFromMailArchives.main(args); + + // app should create a single SequenceFile named "chunk-0" in the output dir + FileStatus[] fileStatuses = fs.listStatus(mrOutputDir.suffix("/part-m-00000")); + assertEquals(1, fileStatuses.length); // only one + assertEquals("part-m-00000", fileStatuses[0].getPath().getName()); + SequenceFileIterator<Text, Text> iterator = + new SequenceFileIterator<>(mrOutputDir.suffix("/part-m-00000"), true, configuration); + + Assert.assertTrue("First key/value pair not found!", iterator.hasNext()); + Pair<Text, Text> record = iterator.next(); + + File parentFileSubSubDir = new File(new File(new File(new File("TEST"), "subdir"), "subsubdir"), "mail-messages-2.gz"); + + String expected = record.getFirst().toString(); + if (SystemUtils.IS_OS_WINDOWS) { + expected = expected.replace("/", "\\"); + } + Assert.assertEquals(new File(parentFileSubSubDir, testVars[0][0]).toString(), expected); + Assert.assertEquals(testVars[0][1] + testVars[0][2], record.getSecond().toString()); + Assert.assertTrue("Second key/value pair not found!", iterator.hasNext()); + + record = iterator.next(); + expected = record.getFirst().toString(); + if (SystemUtils.IS_OS_WINDOWS) { + expected = expected.replace("/", "\\"); + } + Assert.assertEquals(new File(parentFileSubSubDir, testVars[1][0]).toString(), expected); + Assert.assertEquals(testVars[1][1] + testVars[1][2], record.getSecond().toString()); + + // test other file + File parentFile = new File(new File(new File("TEST"), "subdir"), "mail-messages.gz"); + record = iterator.next(); + expected = record.getFirst().toString(); + if (SystemUtils.IS_OS_WINDOWS) { + expected = expected.replace("/", "\\"); + } + Assert.assertEquals(new File(parentFile, testVars[0][0]).toString(), expected); + Assert.assertEquals(testVars[0][1] + testVars[0][2], record.getSecond().toString()); + Assert.assertTrue("Second key/value pair not found!", iterator.hasNext()); + + record = iterator.next(); + expected = record.getFirst().toString(); + if (SystemUtils.IS_OS_WINDOWS) { + expected = expected.replace("/", "\\"); + } + Assert.assertEquals(new File(parentFile, testVars[1][0]).toString(), expected); + Assert.assertEquals(testVars[1][1] + testVars[1][2], record.getSecond().toString()); + Assert.assertFalse("Only four key/value pairs expected!", iterator.hasNext()); + } + + // Messages extracted and made anonymous from the ASF mail archives + private static final String[][] testVars = { + new String[] { + "[email protected]", + "Ant task for JDK1.1 collections build option", + "\nThis is just a test message\n--\nTesty McTester\n" + }, + new String[] { + "[email protected]", + "Problem with build files in several directories", + "\nHi all,\nThis is another test message.\nRegards,\nAnother Test\n" + } + }; + + private static final String testMailMessages = + "From [email protected] Mon Jul 24 19:13:53 2000\n" + + "Return-Path: <[email protected]>\n" + + "Mailing-List: contact [email protected]; run by ezmlm\n" + + "Delivered-To: mailing list [email protected]\n" + + "Received: (qmail 49267 invoked from network); 24 Jul 2000 19:13:53 -0000\n" + + "Message-ID: <" + testVars[0][0] + ">\n" + + "From: \"Testy McTester\" <[email protected]>\n" + + "To: <[email protected]>\n" + + "Subject: " + testVars[0][1] + '\n' + + "Date: Mon, 24 Jul 2000 12:24:56 -0700\n" + + "MIME-Version: 1.0\n" + + "Content-Type: text/plain;\n" + + " charset=\"Windows-1252\"\n" + + "Content-Transfer-Encoding: 7bit\n" + + "X-Spam-Rating: locus.apache.org 1.6.2 0/1000/N\n" + + testVars[0][2] + '\n' + + "From [email protected] Wed Jul 26 11:32:16 2000\n" + + "Return-Path: <[email protected]>\n" + + "Mailing-List: contact [email protected]; run by ezmlm\n" + + "Delivered-To: mailing list [email protected]\n" + + "Received: (qmail 73966 invoked from network); 26 Jul 2000 11:32:16 -0000\n" + + "User-Agent: Microsoft-Outlook-Express-Macintosh-Edition/5.02.2022\n" + + "Date: Wed, 26 Jul 2000 13:32:08 +0200\n" + + "Subject: " + testVars[1][1] + '\n' + + "From: Another Test <[email protected]>\n" + + "To: <[email protected]>\n" + + "Message-Id: <" + testVars[1][0] + ">\n" + + "Mime-Version: 1.0\n" + + "Content-Type: text/plain; charset=\"US-ASCII\"\n" + + "Content-Transfer-Encoding: 7bit\n" + + "X-Spam-Rating: locus.apache.org 1.6.2 0/1000/N\n" + + testVars[1][2]; +} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/test/java/org/apache/mahout/text/TestPathFilter.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/test/java/org/apache/mahout/text/TestPathFilter.java b/community/mahout-mr/integration/src/test/java/org/apache/mahout/text/TestPathFilter.java new file mode 100644 index 0000000..227521a --- /dev/null +++ b/community/mahout-mr/integration/src/test/java/org/apache/mahout/text/TestPathFilter.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.mahout.text; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; + +/** + * Dummy Path Filter for testing the MapReduce version of + * SequenceFilesFromDirectory + */ +public class TestPathFilter implements PathFilter { + + @Override + public boolean accept(Path path) { + return path.getName().startsWith("t") || path.getName().startsWith("r") || path.getName().startsWith("f"); + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java b/community/mahout-mr/integration/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java new file mode 100644 index 0000000..040c8e4 --- /dev/null +++ b/community/mahout-mr/integration/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java @@ -0,0 +1,313 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.text; + +import java.io.File; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.io.Charsets; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.mahout.common.HadoopUtil; +import org.apache.mahout.common.MahoutTestCase; +import org.apache.mahout.common.Pair; +import org.apache.mahout.common.iterator.sequencefile.PathFilters; +import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterator; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class TestSequenceFilesFromDirectory extends MahoutTestCase { + + private static final Logger logger = LoggerFactory.getLogger(TestSequenceFilesFromDirectory.class); + + private static final String[][] DATA1 = { + {"test1", "This is the first text."}, + {"test2", "This is the second text."}, + {"test3", "This is the third text."} + }; + + private static final String[][] DATA2 = { + {"recursive_test1", "This is the first text."}, + {"recursive_test2", "This is the second text."}, + {"recursive_test3", "This is the third text."} + }; + + @Test + public void testSequenceFileFromDirectoryBasic() throws Exception { + // parameters + Configuration configuration = getConfiguration(); + + FileSystem fs = FileSystem.get(configuration); + + // create + Path tmpDir = this.getTestTempDirPath(); + Path inputDir = new Path(tmpDir, "inputDir"); + fs.mkdirs(inputDir); + + Path outputDir = new Path(tmpDir, "outputDir"); + Path outputDirRecursive = new Path(tmpDir, "outputDirRecursive"); + + Path inputDirRecursive = new Path(tmpDir, "inputDirRecur"); + fs.mkdirs(inputDirRecursive); + + // prepare input files + createFilesFromArrays(configuration, inputDir, DATA1); + + SequenceFilesFromDirectory.main(new String[]{ + "--input", inputDir.toString(), + "--output", outputDir.toString(), + "--chunkSize", "64", + "--charset", Charsets.UTF_8.name(), + "--keyPrefix", "UID", + "--method", "sequential"}); + + // check output chunk files + checkChunkFiles(configuration, outputDir, DATA1, "UID"); + + createRecursiveDirFilesFromArrays(configuration, inputDirRecursive, DATA2); + + FileStatus fstInputPath = fs.getFileStatus(inputDirRecursive); + String dirs = HadoopUtil.buildDirList(fs, fstInputPath); + + System.out.println("\n\n ----- recursive dirs: " + dirs); + SequenceFilesFromDirectory.main(new String[]{ + "--input", inputDirRecursive.toString(), + "--output", outputDirRecursive.toString(), + "--chunkSize", "64", + "--charset", Charsets.UTF_8.name(), + "--keyPrefix", "UID", + "--method", "sequential"}); + + checkRecursiveChunkFiles(configuration, outputDirRecursive, DATA2, "UID"); + } + + @Test + public void testSequenceFileFromDirectoryMapReduce() throws Exception { + + Configuration conf = getConfiguration(); + + FileSystem fs = FileSystem.get(conf); + + // create + Path tmpDir = this.getTestTempDirPath(); + Path inputDir = new Path(tmpDir, "inputDir"); + fs.mkdirs(inputDir); + + Path inputDirRecur = new Path(tmpDir, "inputDirRecur"); + fs.mkdirs(inputDirRecur); + + Path mrOutputDir = new Path(tmpDir, "mrOutputDir"); + Path mrOutputDirRecur = new Path(tmpDir, "mrOutputDirRecur"); + + createFilesFromArrays(conf, inputDir, DATA1); + + SequenceFilesFromDirectory.main(new String[]{ + "-Dhadoop.tmp.dir=" + conf.get("hadoop.tmp.dir"), + "--input", inputDir.toString(), + "--output", mrOutputDir.toString(), + "--chunkSize", "64", + "--charset", Charsets.UTF_8.name(), + "--method", "mapreduce", + "--keyPrefix", "UID", + "--fileFilterClass", "org.apache.mahout.text.TestPathFilter" + }); + + checkMRResultFiles(conf, mrOutputDir, DATA1, "UID"); + + createRecursiveDirFilesFromArrays(conf, inputDirRecur, DATA2); + + FileStatus fst_input_path = fs.getFileStatus(inputDirRecur); + String dirs = HadoopUtil.buildDirList(fs, fst_input_path); + + logger.info("\n\n ---- recursive dirs: {}", dirs); + + SequenceFilesFromDirectory.main(new String[]{ + "-Dhadoop.tmp.dir=" + conf.get("hadoop.tmp.dir"), + "--input", inputDirRecur.toString(), + "--output", mrOutputDirRecur.toString(), + "--chunkSize", "64", + "--charset", Charsets.UTF_8.name(), + "--method", "mapreduce", + "--keyPrefix", "UID", + "--fileFilterClass", "org.apache.mahout.text.TestPathFilter" + }); + + checkMRResultFilesRecursive(conf, mrOutputDirRecur, DATA2, "UID"); + } + + + private static void createFilesFromArrays(Configuration conf, Path inputDir, String[][] data) throws IOException { + FileSystem fs = FileSystem.get(conf); + for (String[] aData : data) { + try (OutputStreamWriter writer = + new OutputStreamWriter(fs.create(new Path(inputDir, aData[0])), Charsets.UTF_8)){ + writer.write(aData[1]); + } + } + } + + private static void createRecursiveDirFilesFromArrays(Configuration configuration, Path inputDir, + String[][] data) throws IOException { + FileSystem fs = FileSystem.get(configuration); + + logger.info("creativeRecursiveDirFilesFromArrays > based on: {}", inputDir.toString()); + Path curPath; + String currentRecursiveDir = inputDir.toString(); + + for (String[] aData : data) { + currentRecursiveDir += "/" + aData[0]; + File subDir = new File(currentRecursiveDir); + subDir.mkdir(); + + curPath = new Path(subDir.toString(), "file.txt"); + logger.info("Created file: {}", curPath.toString()); + + try (OutputStreamWriter writer = new OutputStreamWriter(fs.create(curPath), Charsets.UTF_8)){ + writer.write(aData[1]); + } + } + } + + private static void checkChunkFiles(Configuration configuration, + Path outputDir, + String[][] data, + String prefix) throws IOException { + FileSystem fs = FileSystem.get(configuration); + + // output exists? + FileStatus[] fileStatuses = fs.listStatus(outputDir, PathFilters.logsCRCFilter()); + assertEquals(1, fileStatuses.length); // only one + assertEquals("chunk-0", fileStatuses[0].getPath().getName()); + + Map<String, String> fileToData = new HashMap<>(); + for (String[] aData : data) { + fileToData.put(prefix + Path.SEPARATOR + aData[0], aData[1]); + } + + // read a chunk to check content + try (SequenceFileIterator<Text, Text> iterator = + new SequenceFileIterator<>(fileStatuses[0].getPath(), true, configuration)){ + while (iterator.hasNext()) { + Pair<Text, Text> record = iterator.next(); + String retrievedData = fileToData.get(record.getFirst().toString().trim()); + assertNotNull(retrievedData); + assertEquals(retrievedData, record.getSecond().toString().trim()); + } + } + } + + private static void checkRecursiveChunkFiles(Configuration configuration, + Path outputDir, + String[][] data, + String prefix) throws IOException { + FileSystem fs = FileSystem.get(configuration); + + System.out.println(" ----------- check_Recursive_ChunkFiles ------------"); + + // output exists? + FileStatus[] fileStatuses = fs.listStatus(outputDir, PathFilters.logsCRCFilter()); + assertEquals(1, fileStatuses.length); // only one + assertEquals("chunk-0", fileStatuses[0].getPath().getName()); + + + Map<String, String> fileToData = new HashMap<>(); + String currentPath = prefix; + for (String[] aData : data) { + currentPath += Path.SEPARATOR + aData[0]; + fileToData.put(currentPath + Path.SEPARATOR + "file.txt", aData[1]); + } + + // read a chunk to check content + try (SequenceFileIterator<Text, Text> iterator = + new SequenceFileIterator<>(fileStatuses[0].getPath(), true, configuration)) { + while (iterator.hasNext()) { + Pair<Text, Text> record = iterator.next(); + String retrievedData = fileToData.get(record.getFirst().toString().trim()); + System.out.printf("%s >> %s\n", record.getFirst().toString().trim(), record.getSecond().toString().trim()); + + assertNotNull(retrievedData); + assertEquals(retrievedData, record.getSecond().toString().trim()); + System.out.printf(">>> k: %s, v: %s\n", record.getFirst().toString(), record.getSecond().toString()); + } + } + } + + private static void checkMRResultFiles(Configuration conf, Path outputDir, + String[][] data, String prefix) throws IOException { + FileSystem fs = FileSystem.get(conf); + + // output exists? + FileStatus[] fileStatuses = fs.listStatus(outputDir.suffix("/part-m-00000"), PathFilters.logsCRCFilter()); + assertEquals(1, fileStatuses.length); // only one + assertEquals("part-m-00000", fileStatuses[0].getPath().getName()); + Map<String, String> fileToData = new HashMap<>(); + for (String[] aData : data) { + System.out.printf("map.put: %s %s\n", prefix + Path.SEPARATOR + aData[0], aData[1]); + fileToData.put(prefix + Path.SEPARATOR + aData[0], aData[1]); + } + + // read a chunk to check content + try (SequenceFileIterator<Text, Text> iterator = new SequenceFileIterator<>( + fileStatuses[0].getPath(), true, conf)) { + while (iterator.hasNext()) { + Pair<Text, Text> record = iterator.next(); + String retrievedData = fileToData.get(record.getFirst().toString().trim()); + + System.out.printf("MR> %s >> %s\n", record.getFirst().toString().trim(), record.getSecond().toString().trim()); + assertNotNull(retrievedData); + assertEquals(retrievedData, record.getSecond().toString().trim()); + } + } + } + + private static void checkMRResultFilesRecursive(Configuration configuration, Path outputDir, + String[][] data, String prefix) throws IOException { + FileSystem fs = FileSystem.get(configuration); + + // output exists? + FileStatus[] fileStatuses = fs.listStatus(outputDir.suffix("/part-m-00000"), PathFilters.logsCRCFilter()); + assertEquals(1, fileStatuses.length); // only one + assertEquals("part-m-00000", fileStatuses[0].getPath().getName()); + Map<String, String> fileToData = new HashMap<>(); + String currentPath = prefix; + + for (String[] aData : data) { + currentPath += Path.SEPARATOR + aData[0]; + fileToData.put(currentPath + Path.SEPARATOR + "file.txt", aData[1]); + } + + // read a chunk to check content + try (SequenceFileIterator<Text, Text> iterator = new SequenceFileIterator<>( + fileStatuses[0].getPath(), true, configuration)){ + while (iterator.hasNext()) { + Pair<Text, Text> record = iterator.next(); + System.out.printf("MR-Recur > Trying to check: %s\n", record.getFirst().toString().trim()); + String retrievedData = fileToData.get(record.getFirst().toString().trim()); + assertNotNull(retrievedData); + assertEquals(retrievedData, record.getSecond().toString().trim()); + } + } + } +}
