Modified: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java?rev=1003188&r1=1003187&r2=1003188&view=diff ============================================================================== --- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java (original) +++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java Thu Sep 30 18:00:23 2010 @@ -145,7 +145,7 @@ public class DistributedRowMatrix implem if (numRows != other.numRows()) { throw new CardinalityException(numRows, other.numRows()); } - Path outPath = new Path(outputTmpBasePath.getParent(), "productWith"); + Path outPath = new Path(outputTmpBasePath.getParent(), "productWith-" + (System.nanoTime() & 0xFF)); JobConf conf = MatrixMultiplicationJob.createMatrixMultiplyJobConf(rowPath, other.rowPath, outPath, other.numCols); JobClient.runJob(conf); DistributedRowMatrix out = new DistributedRowMatrix(outPath, outputTmpPath, numCols, other.numCols());
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/DistributedLanczosSolver.java URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/DistributedLanczosSolver.java?rev=1003188&r1=1003187&r2=1003188&view=diff ============================================================================== --- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/DistributedLanczosSolver.java (original) +++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/DistributedLanczosSolver.java Thu Sep 30 18:00:23 2010 @@ -62,6 +62,34 @@ public class DistributedLanczosSolver ex initialVector.assign(1.0 / Math.sqrt(corpus.numCols())); return initialVector; } + + /** + * Factored-out LanczosSolver for the purpose of invoking it programmatically + * @param originalConfig + * @param inputPathString + * @param outputTmpPath + * @param numRows + * @param numCols + * @param isSymmetric + * @param desiredRank + * @param eigenVectors + * @param eigenValues + * @param outputEigenVectorPathString + * @throws IOException + */ + public void runJob(Configuration originalConfig, Path inputPath, + Path outputTmpPath, int numRows, int numCols, + boolean isSymmetric, int desiredRank, Matrix eigenVectors, + List<Double> eigenValues, String outputEigenVectorPathString) + throws IOException { + DistributedRowMatrix matrix = new DistributedRowMatrix( + inputPath, outputTmpPath, + numRows, numCols); + matrix.configure(new JobConf(originalConfig)); + setConf(originalConfig); + solve(matrix, desiredRank, eigenVectors, eigenValues, isSymmetric); + serializeOutput(eigenVectors, eigenValues, new Path(outputEigenVectorPathString)); + } @Override public int run(String[] strings) throws Exception { @@ -164,7 +192,6 @@ public class DistributedLanczosSolver ex } /** - * TODO: this should be refactored to allow both LanczosSolver impls to properly serialize output in a generic way. * @param eigenVectors The eigenvectors to be serialized * @param eigenValues The eigenvalues to be serialized * @param outputPath The path (relative to the current Configuration's FileSystem) to save the output to. Modified: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVerificationJob.java URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVerificationJob.java?rev=1003188&r1=1003187&r2=1003188&view=diff ============================================================================== --- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVerificationJob.java (original) +++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVerificationJob.java Thu Sep 30 18:00:23 2010 @@ -90,6 +90,10 @@ public class EigenVerificationJob extend private JobConf conf; + private int maxEigensToKeep; + + private Path cleanedEigensPath; + public void setEigensToVerify(VectorIterable eigens) { eigensToVerify = eigens; } @@ -102,20 +106,14 @@ public class EigenVerificationJob extend } else if (argMap.isEmpty()) { return 0; } - outPath = getOutputPath(); - tmpOut = new Path(outPath, "tmp"); - - Path eigenInput = null; - boolean inMemory = false; - if (argMap.get("--eigenInput") != null) { - eigenInput = new Path(argMap.get("--eigenInput")); - inMemory = argMap.get("--inMemory") != null; - } - Path corpusInput = new Path(argMap.get("--corpusInput")); - - run(corpusInput, eigenInput, outPath, tmpOut, Double.parseDouble(argMap.get("--maxError")), Double.parseDouble(argMap - .get("--minEigenvalue")), inMemory, new JobConf(getConf())); - + // parse out the arguments + runJob(new Path(argMap.get("--eigenInput")), + new Path(argMap.get("--corpusInput")), + getOutputPath(), + argMap.get("--inMemory") != null, + Double.parseDouble(argMap.get("--maxError")), + Double.parseDouble(argMap.get("--minEigenvalue")), + Integer.parseInt(argMap.get("--maxEigens"))); return 0; } @@ -132,13 +130,13 @@ public class EigenVerificationJob extend * @throws IOException */ public int run(Path corpusInput, - Path eigenInput, - Path output, - Path tempOut, - double maxError, - double minEigenValue, - boolean inMemory, - JobConf config) throws IOException { + Path eigenInput, + Path output, + Path tempOut, + double maxError, + double minEigenValue, + boolean inMemory, + JobConf config) throws IOException { this.outPath = output; this.tmpOut = tempOut; this.maxError = maxError; @@ -200,8 +198,19 @@ public class EigenVerificationJob extend VectorWritable vw = new VectorWritable(ev); iw.set(s.index()); seqWriter.append(iw, vw); + + int numEigensWritten = 0; + // increment the number of eigenvectors written and see if we've + // reached our specified limit, or if we wish to write all eigenvectors + // (latter is built-in, since numEigensWritten will always be > 0 + numEigensWritten++; + if (numEigensWritten == maxEigensToKeep) { + log.info("{} of the {} total eigens have been written", new Integer(maxEigensToKeep), new Integer(prunedEigenMeta.size())); + break; + } } seqWriter.close(); + cleanedEigensPath = path; } private List<Map.Entry<MatrixSlice, EigenStatus>> pruneEigens(Map<MatrixSlice, EigenStatus> eigenMetaData) { @@ -248,7 +257,54 @@ public class EigenVerificationJob extend } } + public Path getCleanedEigensPath() { + return cleanedEigensPath; + } + public static void main(String[] args) throws Exception { ToolRunner.run(new EigenVerificationJob(), args); } + + /** + * Progammatic invocation of run() + * @param eigenInput Output of LanczosSolver + * @param corpusInput Input of LanczosSolver + * @param output + * @param inMemory + * @param maxError + * @param minEigenValue + * @param maxEigens + */ + public void runJob(Path eigenInput, + Path corpusInput, + Path output, + boolean inMemory, + double maxError, + double minEigenValue, + int maxEigens) throws IOException { + // no need to handle command line arguments + outPath = output; + tmpOut = new Path(outPath, "tmp"); + maxEigensToKeep = maxEigens; + this.maxError = maxError; + if (getConf() == null) { + setConf(new Configuration()); + } + if (eigenInput != null && eigensToVerify == null) { + prepareEigens(eigenInput, inMemory); + } + + DistributedRowMatrix c = new DistributedRowMatrix(corpusInput, tmpOut, 1, 1); + c.configure(new JobConf(getConf())); + corpus = c; + + eigenVerifier = new SimpleEigenVerifier(); + OrthonormalityVerifier orthoVerifier = new OrthonormalityVerifier(); + VectorIterable pairwiseInnerProducts = computePairwiseInnerProducts(); + // FIXME: Why is the above vector computed if it is never used? + + Map<MatrixSlice, EigenStatus> eigenMetaData = verifyEigens(); + List<Map.Entry<MatrixSlice, EigenStatus>> prunedEigenMeta = pruneEigens(eigenMetaData); + saveCleanEigens(prunedEigenMeta); + } } Added: mahout/trunk/core/src/test/java/org/apache/mahout/clustering/spectral/common/TestAffinityMatrixInputJob.java URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/spectral/common/TestAffinityMatrixInputJob.java?rev=1003188&view=auto ============================================================================== --- mahout/trunk/core/src/test/java/org/apache/mahout/clustering/spectral/common/TestAffinityMatrixInputJob.java (added) +++ mahout/trunk/core/src/test/java/org/apache/mahout/clustering/spectral/common/TestAffinityMatrixInputJob.java Thu Sep 30 18:00:23 2010 @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.spectral.common; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.mahout.clustering.spectral.eigencuts.EigencutsKeys; +import org.apache.mahout.common.DummyRecordWriter; +import org.apache.mahout.common.MahoutTestCase; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; +import org.apache.mahout.math.hadoop.DistributedRowMatrix; +import org.apache.mahout.math.hadoop.DistributedRowMatrix.MatrixEntryWritable; +import org.junit.Test; + +/** + * <p>Tests the affinity matrix input M/R task.</p> + * + * <p>The tricky item with this task is that the format of the input + * must be correct; it must take the form of a graph input, and for the + * current implementation, the input must be symmetric, e.g. the weight + * from node A to B = the weight from node B to A. This is not explicitly + * enforced within the task itself (since, as of the time these tests were + * written, we have not yet decided on a final rule regarding the + * symmetry/non-symmetry of the affinity matrix, so we are unofficially + * enforcing symmetry). Input looks something like this:</p> + * + * <pre>0, 0, 0 + * 0, 1, 10 + * 0, 2, 20 + * ... + * 1, 0, 10 + * 2, 0, 20 + * ...</pre> + * + * <p>The mapper's task is simply to convert each line of text into a + * DistributedRowMatrix entry, allowing the reducer to join each entry + * of the same row into a VectorWritable.</p> + * + * <p>Exceptions are thrown in cases of bad input format: if there are + * more or fewer than 3 numbers per line, or any of the numbers are missing. + */ +public class TestAffinityMatrixInputJob extends MahoutTestCase { + + private String [] raw = {"0,0,0", "0,1,5", "0,2,10", "1,0,5", "1,1,0", + "1,2,20", "2,0,10", "2,1,20", "2,2,0"}; + private int rawDimensions = 3; + + @Test + public void testAffinityMatrixInputMapper() throws Exception { + AffinityMatrixInputMapper mapper = new AffinityMatrixInputMapper(); + Configuration conf = new Configuration(); + conf.setInt(EigencutsKeys.AFFINITY_DIMENSIONS, rawDimensions); + + // set up the dummy writer and the M/R context + DummyRecordWriter<IntWritable, DistributedRowMatrix.MatrixEntryWritable> writer = + new DummyRecordWriter<IntWritable, DistributedRowMatrix.MatrixEntryWritable>(); + Mapper<LongWritable, Text, IntWritable, DistributedRowMatrix.MatrixEntryWritable>.Context + context = DummyRecordWriter.build(mapper, conf, writer); + + // loop through all the points and test each one is converted + // successfully to a DistributedRowMatrix.MatrixEntry + for (int i = 0; i < raw.length; i++) { + mapper.map(new LongWritable(), new Text(raw[i]), context); + } + + // test the data was successfully constructed + assertEquals("Number of map results", rawDimensions, writer.getData().size()); + Set<IntWritable> keys = writer.getData().keySet(); + for (IntWritable i : keys) { + List<MatrixEntryWritable> row = writer.getData().get(i); + assertEquals("Number of items in row", rawDimensions, row.size()); + } + } + + @Test + public void testAffinitymatrixInputReducer() throws Exception { + AffinityMatrixInputMapper mapper = new AffinityMatrixInputMapper(); + Configuration conf = new Configuration(); + conf.setInt(EigencutsKeys.AFFINITY_DIMENSIONS, rawDimensions); + + // set up the dummy writer and the M/R context + DummyRecordWriter<IntWritable, DistributedRowMatrix.MatrixEntryWritable> mapWriter = + new DummyRecordWriter<IntWritable, DistributedRowMatrix.MatrixEntryWritable>(); + Mapper<LongWritable, Text, IntWritable, DistributedRowMatrix.MatrixEntryWritable>.Context + mapContext = DummyRecordWriter.build(mapper, conf, mapWriter); + + // loop through all the points and test each one is converted + // successfully to a DistributedRowMatrix.MatrixEntry + for (int i = 0; i < raw.length; i++) { + mapper.map(new LongWritable(), new Text(raw[i]), mapContext); + } + // store the data for checking later + Map<IntWritable, List<MatrixEntryWritable>> map = mapWriter.getData(); + + // now reduce the data + AffinityMatrixInputReducer reducer = new AffinityMatrixInputReducer(); + DummyRecordWriter<IntWritable, VectorWritable> redWriter = + new DummyRecordWriter<IntWritable, VectorWritable>(); + Reducer<IntWritable, DistributedRowMatrix.MatrixEntryWritable, + IntWritable, VectorWritable>.Context redContext = DummyRecordWriter + .build(reducer, conf, redWriter, IntWritable.class, MatrixEntryWritable.class); + for (IntWritable key : mapWriter.getKeys()) { + reducer.reduce(key, mapWriter.getValue(key), redContext); + } + + // check that all the elements are correctly ordered + assertEquals("Number of reduce results", rawDimensions, redWriter.getData().size()); + for (IntWritable row : redWriter.getKeys()) { + List<VectorWritable> list = redWriter.getValue(row); + assertEquals("Should only be one vector", 1, list.size()); + // check that the elements in the array are correctly ordered + Vector v = list.get(0).get(); + for (Vector.Element e : v) { + // find this value in the original map + MatrixEntryWritable toCompare = new MatrixEntryWritable(); + toCompare.setRow(-1); + toCompare.setCol(e.index()); + toCompare.setVal(e.get()); + assertTrue("This entry was correctly placed in its row", map.get(row).contains(toCompare)); + } + } + } +} Added: mahout/trunk/core/src/test/java/org/apache/mahout/clustering/spectral/common/TestMatrixDiagonalizeJob.java URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/spectral/common/TestMatrixDiagonalizeJob.java?rev=1003188&view=auto ============================================================================== --- mahout/trunk/core/src/test/java/org/apache/mahout/clustering/spectral/common/TestMatrixDiagonalizeJob.java (added) +++ mahout/trunk/core/src/test/java/org/apache/mahout/clustering/spectral/common/TestMatrixDiagonalizeJob.java Thu Sep 30 18:00:23 2010 @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.spectral.common; + +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.mahout.clustering.spectral.common.MatrixDiagonalizeJob.MatrixDiagonalizeMapper; +import org.apache.mahout.clustering.spectral.common.MatrixDiagonalizeJob.MatrixDiagonalizeReducer; +import org.apache.mahout.clustering.spectral.eigencuts.EigencutsKeys; +import org.apache.mahout.common.DummyRecordWriter; +import org.apache.mahout.common.MahoutTestCase; +import org.apache.mahout.math.RandomAccessSparseVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; +import org.junit.Test; + +/** + * <p>The MatrixDiagonalize task is pretty simple: given a matrix, + * it sums the elements of the row, and sticks the sum in position (i, i) + * of a new matrix of identical dimensions to the original.</p> + */ +public class TestMatrixDiagonalizeJob extends MahoutTestCase { + + private double [][] raw = { {1, 2, 3}, {4, 5, 6}, {7, 8, 9} }; + private int rawDimensions = 3; + + private static double rowSum(double [] row) { + double sum = 0; + for (int i = 0; i < row.length; i++) { sum += row[i]; } + return sum; + } + + @Test + public void testMatrixDiagonalizeMapper() throws Exception { + MatrixDiagonalizeMapper mapper = new MatrixDiagonalizeMapper(); + Configuration conf = new Configuration(); + conf.setInt(EigencutsKeys.AFFINITY_DIMENSIONS, rawDimensions); + + // set up the dummy writers + DummyRecordWriter<NullWritable, IntDoublePairWritable> writer = + new DummyRecordWriter<NullWritable, IntDoublePairWritable>(); + Mapper<IntWritable, VectorWritable, NullWritable, IntDoublePairWritable>.Context + context = DummyRecordWriter.build(mapper, conf, writer); + + // perform the mapping + for (int i = 0; i < rawDimensions; i++) { + RandomAccessSparseVector toAdd = new RandomAccessSparseVector(rawDimensions); + toAdd.assign(raw[i]); + mapper.map(new IntWritable(i), new VectorWritable(toAdd), context); + } + + // check the number of the results + assertEquals("Number of map results", rawDimensions, + writer.getValue(NullWritable.get()).size()); + } + + @Test + public void testMatrixDiagonalizeReducer() throws Exception { + MatrixDiagonalizeMapper mapper = new MatrixDiagonalizeMapper(); + Configuration conf = new Configuration(); + conf.setInt(EigencutsKeys.AFFINITY_DIMENSIONS, rawDimensions); + + // set up the dummy writers + DummyRecordWriter<NullWritable, IntDoublePairWritable> mapWriter = + new DummyRecordWriter<NullWritable, IntDoublePairWritable>(); + Mapper<IntWritable, VectorWritable, NullWritable, IntDoublePairWritable>.Context + mapContext = DummyRecordWriter.build(mapper, conf, mapWriter); + + // perform the mapping + for (int i = 0; i < rawDimensions; i++) { + RandomAccessSparseVector toAdd = new RandomAccessSparseVector(rawDimensions); + toAdd.assign(raw[i]); + mapper.map(new IntWritable(i), new VectorWritable(toAdd), mapContext); + } + + // now perform the reduction + MatrixDiagonalizeReducer reducer = new MatrixDiagonalizeReducer(); + DummyRecordWriter<NullWritable, VectorWritable> redWriter = new + DummyRecordWriter<NullWritable, VectorWritable>(); + Reducer<NullWritable, IntDoublePairWritable, NullWritable, VectorWritable>.Context + redContext = DummyRecordWriter.build(reducer, conf, redWriter, + NullWritable.class, IntDoublePairWritable.class); + + // only need one reduction + reducer.reduce(NullWritable.get(), mapWriter.getValue(NullWritable.get()), redContext); + + // first, make sure there's only one result + List<VectorWritable> list = redWriter.getValue(NullWritable.get()); + assertEquals("Only a single resulting vector", 1, list.size()); + Vector v = list.get(0).get(); + for (int i = 0; i < v.size(); i++) { + assertEquals("Element sum is correct", rowSum(raw[i]), v.get(i),0.01); + } + } +} Added: mahout/trunk/core/src/test/java/org/apache/mahout/clustering/spectral/common/TestUnitVectorizerJob.java URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/spectral/common/TestUnitVectorizerJob.java?rev=1003188&view=auto ============================================================================== --- mahout/trunk/core/src/test/java/org/apache/mahout/clustering/spectral/common/TestUnitVectorizerJob.java (added) +++ mahout/trunk/core/src/test/java/org/apache/mahout/clustering/spectral/common/TestUnitVectorizerJob.java Thu Sep 30 18:00:23 2010 @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.spectral.common; + +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.mahout.clustering.spectral.common.UnitVectorizerJob.UnitVectorizerMapper; +import org.apache.mahout.common.DummyRecordWriter; +import org.apache.mahout.common.MahoutTestCase; +import org.apache.mahout.math.RandomAccessSparseVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; +import org.junit.Test; + +public class TestUnitVectorizerJob extends MahoutTestCase { + + private double [][] raw = { {1, 2, 3}, {4, 5, 6}, {7, 8, 9} }; + + @Test + public void testUnitVectorizerMapper() throws Exception { + UnitVectorizerMapper mapper = new UnitVectorizerMapper(); + Configuration conf = new Configuration(); + + // set up the dummy writers + DummyRecordWriter<IntWritable, VectorWritable> writer = new + DummyRecordWriter<IntWritable, VectorWritable>(); + Mapper<IntWritable, VectorWritable, IntWritable, VectorWritable>.Context + context = DummyRecordWriter.build(mapper, conf, writer); + + // perform the mapping + for (int i = 0; i < raw.length; i++) { + Vector vector = new RandomAccessSparseVector(raw[i].length); + vector.assign(raw[i]); + mapper.map(new IntWritable(i), new VectorWritable(vector), context); + } + + // check the results + assertEquals("Number of map results", raw.length, writer.getData().size()); + for (int i = 0; i < raw.length; i++) { + IntWritable key = new IntWritable(i); + List<VectorWritable> list = writer.getValue(key); + assertEquals("Only one element per row", 1, list.size()); + Vector v = list.get(0).get(); + assertTrue("Unit vector sum is 1 or differs by 0.0001", Math.abs(v.norm(2) - 1) < 0.000001); + } + } +} Added: mahout/trunk/core/src/test/java/org/apache/mahout/clustering/spectral/common/TestVectorCache.java URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/spectral/common/TestVectorCache.java?rev=1003188&view=auto ============================================================================== --- mahout/trunk/core/src/test/java/org/apache/mahout/clustering/spectral/common/TestVectorCache.java (added) +++ mahout/trunk/core/src/test/java/org/apache/mahout/clustering/spectral/common/TestVectorCache.java Thu Sep 30 18:00:23 2010 @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.spectral.common; + +import java.net.URI; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.filecache.DistributedCache; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.mahout.common.HadoopUtil; +import org.apache.mahout.common.MahoutTestCase; +import org.apache.mahout.math.DenseVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; +import org.junit.Test; + +public class TestVectorCache extends MahoutTestCase { + + private double [] vector = { 1, 2, 3, 4 }; + + @Test + public void testSave() throws Exception { + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.get(conf); + IntWritable key = new IntWritable(0); + Vector value = new DenseVector(vector); + Path path = getTestTempDirPath("output"); + + // write the vector out + VectorCache.save(key, value, path, conf, true, true); + + // can we read it from here? + SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf); + VectorWritable old = new VectorWritable(); + reader.next(key, old); + reader.close(); + + // test if the values are identical + assertTrue("Saved vector is identical to original", old.get().equals(value)); + } + + @Test + public void testLoad() throws Exception { + // save a vector manually + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.get(conf); + IntWritable key = new IntWritable(0); + Vector value = new DenseVector(vector); + Path path = getTestTempDirPath("output"); + + // write the vector + path = fs.makeQualified(path); + fs.deleteOnExit(path); + HadoopUtil.overwriteOutput(path); + DistributedCache.setCacheFiles(new URI[] {path.toUri()}, conf); + SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path, + IntWritable.class, VectorWritable.class); + writer.append(key, new VectorWritable(value)); + writer.close(); + + // load it + Vector result = VectorCache.load(key, conf); + + // are they the same? + assertTrue("Vector is not null", result != null); + assertTrue("Loaded vector is identical to original", result.equals(value)); + } + + @Test + public void testAll() throws Exception { + Configuration conf = new Configuration(); + Vector v = new DenseVector(vector); + Path toSave = getTestTempDirPath("output"); + IntWritable key = new IntWritable(0); + + // save it + VectorCache.save(key, v, toSave, conf); + + // now, load it back + key = new IntWritable(0); + Vector v2 = VectorCache.load(key, conf); + + // are they the same? + assertTrue("Vector is not null", v2 != null); + assertTrue("Vectors are identical", v2.equals(v)); + } +} Added: mahout/trunk/core/src/test/java/org/apache/mahout/clustering/spectral/common/TestVectorMatrixMultiplicationJob.java URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/spectral/common/TestVectorMatrixMultiplicationJob.java?rev=1003188&view=auto ============================================================================== --- mahout/trunk/core/src/test/java/org/apache/mahout/clustering/spectral/common/TestVectorMatrixMultiplicationJob.java (added) +++ mahout/trunk/core/src/test/java/org/apache/mahout/clustering/spectral/common/TestVectorMatrixMultiplicationJob.java Thu Sep 30 18:00:23 2010 @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.spectral.common; + +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.mahout.clustering.spectral.common.VectorMatrixMultiplicationJob.VectorMatrixMultiplicationMapper; +import org.apache.mahout.common.DummyRecordWriter; +import org.apache.mahout.common.MahoutTestCase; +import org.apache.mahout.math.DenseVector; +import org.apache.mahout.math.RandomAccessSparseVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; +import org.junit.Test; + +/** + * <p>This test ensures that a Vector can be successfully multiplied + * with a matrix.</p> + */ +public class TestVectorMatrixMultiplicationJob extends MahoutTestCase { + + private double [][] matrix = { {1, 1}, {2, 3} }; + private double [] vector = {9, 16}; + + @Test + public void testVectorMatrixMultiplicationMapper() throws Exception { + VectorMatrixMultiplicationMapper mapper = new VectorMatrixMultiplicationMapper(); + Configuration conf = new Configuration(); + + // set up all the parameters for the job + Vector toSave = new DenseVector(vector); + DummyRecordWriter<IntWritable, VectorWritable> writer = new + DummyRecordWriter<IntWritable, VectorWritable>(); + Mapper<IntWritable, VectorWritable, IntWritable, VectorWritable>.Context + context = DummyRecordWriter.build(mapper, conf, writer); + mapper.setup(toSave); + + // run the job + for (int i = 0; i < matrix.length; i++) { + Vector v = new RandomAccessSparseVector(matrix[i].length); + v.assign(matrix[i]); + mapper.map(new IntWritable(i), new VectorWritable(v), context); + } + + // check the results + assertEquals("Number of map results", matrix.length, writer.getData().size()); + for (int i = 0; i < matrix.length; i++) { + List<VectorWritable> list = writer.getValue(new IntWritable(i)); + assertEquals("Only one vector per key", 1, list.size()); + Vector v = list.get(0).get(); + for (int j = 0; j < matrix[i].length; j++) { + double total = Math.sqrt(vector[i]) * Math.sqrt(vector[j]) * matrix[i][j]; + assertEquals("Product matrix elements", total, v.get(j),EPSILON); + } + } + } +} Added: mahout/trunk/core/src/test/java/org/apache/mahout/clustering/spectral/eigencuts/TestEigencutsAffinityCutsJob.java URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/spectral/eigencuts/TestEigencutsAffinityCutsJob.java?rev=1003188&view=auto ============================================================================== --- mahout/trunk/core/src/test/java/org/apache/mahout/clustering/spectral/eigencuts/TestEigencutsAffinityCutsJob.java (added) +++ mahout/trunk/core/src/test/java/org/apache/mahout/clustering/spectral/eigencuts/TestEigencutsAffinityCutsJob.java Thu Sep 30 18:00:23 2010 @@ -0,0 +1,316 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.spectral.eigencuts; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.mahout.clustering.spectral.common.VertexWritable; +import org.apache.mahout.clustering.spectral.eigencuts.EigencutsAffinityCutsJob.EigencutsAffinityCutsCombiner; +import org.apache.mahout.clustering.spectral.eigencuts.EigencutsAffinityCutsJob.EigencutsAffinityCutsMapper; +import org.apache.mahout.clustering.spectral.eigencuts.EigencutsAffinityCutsJob.EigencutsAffinityCutsReducer; +import org.apache.mahout.common.DummyRecordWriter; +import org.apache.mahout.common.MahoutTestCase; +import org.apache.mahout.math.DenseVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; +import org.junit.Test; + +/** + * <p>Tests the Eigencuts affinity matrix "cut" ability, the core functionality + * of the algorithm responsible for making the clusterings.</p> + * + * <p>Due to the complexity of this section, and the amount of data required, + * there are three steps: the mapper essentially reads in the affinity/cut + * matrices and creating "vertices" of points, the combiner performs the + * actual checks on the sensitivities and zeroes out the necessary affinities, + * and at last the reducer reforms the affinity matrix.</p> + */ +public class TestEigencutsAffinityCutsJob extends MahoutTestCase { + + private final double [][] affinity = { {0, 10, 2, 1}, {10, 0, 2, 2}, + {2, 2, 0, 10}, {1, 2, 10, 0} }; + private final double [][] sensitivity = { {0, 0, 1, 1}, {0, 0, 1, 1}, + {1, 1, 0, 0}, {1, 1, 0, 0} }; + + /** + * Testing the mapper is fairly straightforward: there are two matrices + * to be processed simultaneously (cut matrix of sensitivities, and the + * affinity matrix), and since both are symmetric, two entries from each + * will be grouped together with the same key (or, in the case of an + * entry along the diagonal, only two entries). + * + * The correct grouping of these quad or pair vertices is the only + * output of the mapper. + * + * @throws Exception + */ + @Test + public void testEigencutsAffinityCutsMapper() throws Exception { + EigencutsAffinityCutsMapper mapper = new EigencutsAffinityCutsMapper(); + Configuration conf = new Configuration(); + conf.setInt(EigencutsKeys.AFFINITY_DIMENSIONS, this.affinity.length); + + // set up the writer + DummyRecordWriter<Text, VertexWritable> writer = + new DummyRecordWriter<Text, VertexWritable>(); + Mapper<IntWritable, VectorWritable, Text, VertexWritable>.Context context = + DummyRecordWriter.build(mapper, conf, writer); + + // perform the maps + for (int i = 0; i < this.affinity.length; i++) { + VectorWritable aff = new VectorWritable(new DenseVector(this.affinity[i])); + VectorWritable sens = new VectorWritable(new DenseVector(this.sensitivity[i])); + IntWritable key = new IntWritable(i); + mapper.map(key, aff, context); + mapper.map(key, sens, context); + } + + // were the vertices constructed correctly? if so, then for two 4x4 + // matrices, there should be 10 unique keys with 56 total entries + assertEquals("Number of keys", 10, writer.getKeys().size()); + for (int i = 0; i < this.affinity.length; i++) { + for (int j = 0; j < this.affinity.length; j++) { + Text key = new Text(Math.max(i, j) + "_" + Math.min(i,j)); + List<VertexWritable> values = writer.getValue(key); + + // if we're on a diagonal, there should only be 2 entries + // otherwise, there should be 4 + if (i == j) { + assertEquals("Diagonal entry", 2, values.size()); + for (VertexWritable v : values) { + assertFalse("Diagonal values are zero", v.getValue() > 0); + } + } else { + assertEquals("Off-diagonal entry", 4, values.size()); + if (i + j == 3) { // all have values greater than 0 + for (VertexWritable v : values) { + assertTrue("Off-diagonal non-zero entries", v.getValue() > 0); + } + } + } + } + } + } + + /** + * This is by far the trickiest step. However, an easy condition is if + * we have only two vertices - indicating vertices on the diagonal of the + * two matrices - then we simply exit (since the algorithm does not operate + * on the diagonal; it makes no sense to perform cuts by isolating data + * points from themselves). + * + * If there are four points, then first we must separate the two which + * belong to the affinity matrix from the two that are sensitivities. In theory, + * each pair should have exactly the same value (symmetry). If the sensitivity + * is below a certain threshold, then we set the two values of the affinity + * matrix to 0 (but not before adding the affinity values to the diagonal, so + * as to maintain the overall sum of the row of the affinity matrix). + * + * @throws Exception + */ + @Test + public void testEigencutsAffinityCutsCombiner() throws Exception { + Configuration conf = new Configuration(); + Path affinity = new Path("affinity"); + Path sensitivity = new Path("sensitivity"); + conf.set(EigencutsKeys.AFFINITY_PATH, affinity.getName()); + conf.setInt(EigencutsKeys.AFFINITY_DIMENSIONS, this.affinity.length); + + // since we need the working paths to distinguish the vertex types, + // we can't use the mapper (since we have no way of manually setting + // the Context.workingPath() ) + Map<Text, List<VertexWritable>> data = buildMapData(affinity, sensitivity, this.sensitivity); + + // now, set up the combiner + EigencutsAffinityCutsCombiner combiner = new EigencutsAffinityCutsCombiner(); + DummyRecordWriter<Text, VertexWritable> redWriter = + new DummyRecordWriter<Text, VertexWritable>(); + Reducer<Text, VertexWritable, Text, VertexWritable>.Context + redContext = DummyRecordWriter.build(combiner, conf, redWriter, Text.class, + VertexWritable.class); + + // perform the combining + for (Text key : data.keySet()) { + combiner.reduce(key, data.get(key), redContext); + } + + // test the number of cuts, there should be 2 + assertEquals("Number of cuts detected", 4, + redContext.getCounter(EigencutsAffinityCutsJob.CUTSCOUNTER.NUM_CUTS).getValue()); + + // loop through all the results; let's see if they match up to our + // affinity matrix (and all the cuts appear where they should + Map<Text, List<VertexWritable>> results = redWriter.getData(); + for (Text thekey : results.keySet()) { + List<VertexWritable> row = results.get(thekey); + IntWritable key = new IntWritable(Integer.parseInt(thekey.toString())); + + double calcDiag = 0.0, trueDiag = sumOfRowCuts(key.get(), this.sensitivity); + for (VertexWritable e : row) { + + // should the value have been cut, e.g. set to 0? + if (key.get() == e.getCol()) { + // we have our diagonal + calcDiag += e.getValue(); + } else if (this.sensitivity[key.get()][e.getCol()] == 0.0) { + // no, corresponding affinity should have same value as before + assertEquals("Preserved affinity value", + this.affinity[key.get()][e.getCol()], e.getValue(),EPSILON); + } else { + // yes, corresponding affinity value should be 0 + assertEquals("Cut affinity value", 0.0, e.getValue(),EPSILON); + } + } + // check the diagonal has the correct sum + assertEquals("Diagonal sum from cuts", trueDiag, calcDiag,EPSILON); + } + } + + /** + * Fairly straightforward: the task here is to reassemble the rows of the + * affinity matrix. The tricky part is that any specific element in the list + * of elements which does NOT lay on the diagonal will be so because it + * did not drop below the sensitivity threshold, hence it was not "cut". + * + * On the flip side, there will be many entries whose coordinate is now + * set to the diagonal, indicating they were previously affinity entries + * whose sensitivities were below the threshold, and hence were "cut" - + * set to 0 at their original coordinates, and had their values added to + * the diagonal entry (hence the numerous entries with the coordinate of + * the diagonal). + * + * @throws Exception + */ + @Test + public void testEigencutsAffinityCutsReducer() throws Exception { + Configuration conf = new Configuration(); + Path affinity = new Path("affinity"); + Path sensitivity = new Path("sensitivity"); + conf.set(EigencutsKeys.AFFINITY_PATH, affinity.getName()); + conf.setInt(EigencutsKeys.AFFINITY_DIMENSIONS, this.affinity.length); + + // since we need the working paths to distinguish the vertex types, + // we can't use the mapper (since we have no way of manually setting + // the Context.workingPath() ) + Map<Text, List<VertexWritable>> data = buildMapData(affinity, sensitivity, this.sensitivity); + + // now, set up the combiner + EigencutsAffinityCutsCombiner combiner = new EigencutsAffinityCutsCombiner(); + DummyRecordWriter<Text, VertexWritable> comWriter = + new DummyRecordWriter<Text, VertexWritable>(); + Reducer<Text, VertexWritable, Text, VertexWritable>.Context + comContext = DummyRecordWriter.build(combiner, conf, comWriter, Text.class, + VertexWritable.class); + + // perform the combining + for (Text key : data.keySet()) { + combiner.reduce(key, data.get(key), comContext); + } + + // finally, set up the reduction writers + EigencutsAffinityCutsReducer reducer = new EigencutsAffinityCutsReducer(); + DummyRecordWriter<IntWritable, VectorWritable> redWriter = new + DummyRecordWriter<IntWritable, VectorWritable>(); + Reducer<Text, VertexWritable, IntWritable, VectorWritable>.Context + redContext = DummyRecordWriter.build(reducer, conf, redWriter, + Text.class, VertexWritable.class); + + // perform the reduction + for (Text key : comWriter.getKeys()) { + reducer.reduce(key, comWriter.getValue(key), redContext); + } + + // now, check that the affinity matrix is correctly formed + for (IntWritable row : redWriter.getKeys()) { + List<VectorWritable> results = redWriter.getValue(row); + // there should only be 1 vector + assertEquals("Only one vector with a given row number", 1, results.size()); + Vector therow = results.get(0).get(); + for (Vector.Element e : therow) { + // check the diagonal + if (row.get() == e.index()) { + assertEquals("Correct diagonal sum of cuts", sumOfRowCuts(row.get(), + this.sensitivity), e.get(),EPSILON); + } else { + // not on the diagonal...if it was an element labeled to be cut, + // it should have a value of 0. Otherwise, it should have kept its + // previous value + if (this.sensitivity[row.get()][e.index()] != 0.0) { + // should be 0 + assertEquals("Cut element", 0.0, e.get(),EPSILON); + } else { + // should be what it was originally + assertEquals("Preserved element", this.affinity[row.get()][e.index()], e.get(),EPSILON); + } + } + } + } + } + + /** + * Utility method for simulating the Mapper behavior. + * @param affinity + * @param sensitivity + * @param array + * @return + */ + private Map<Text, List<VertexWritable>> buildMapData(Path affinity, + Path sensitivity, double [][] array) { + Map<Text, List<VertexWritable>> map = new HashMap<Text, List<VertexWritable>>(); + for (int i = 0; i < this.affinity.length; i++) { + for (int j = 0; j < this.affinity[i].length; j++) { + Text key = new Text(Math.max(i, j) + "_" + Math.min(i, j)); + List<VertexWritable> toAdd = new ArrayList<VertexWritable>(); + if (map.containsKey(key)) { + toAdd = map.get(key); + map.remove(key); + } + toAdd.add(new VertexWritable(i, j, this.affinity[i][j], affinity.getName())); + toAdd.add(new VertexWritable(i, j, array[i][j], sensitivity.getName())); + map.put(key, toAdd); + } + } + return map; + } + + /** + * Utility method for calculating the new diagonal on the specified row of the + * affinity matrix after a single iteration, given the specified cut matrix + * @param row + * @param cuts + * @return + */ + private double sumOfRowCuts(int row, double [][] cuts) { + double retval = 0.0; + for (int j = 0; j < this.affinity[row].length; j++) { + if (cuts[row][j] != 0.0) { + retval += this.affinity[row][j]; + } + } + return retval; + } +} Added: mahout/trunk/core/src/test/java/org/apache/mahout/clustering/spectral/eigencuts/TestEigencutsSensitivityJob.java URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/spectral/eigencuts/TestEigencutsSensitivityJob.java?rev=1003188&view=auto ============================================================================== --- mahout/trunk/core/src/test/java/org/apache/mahout/clustering/spectral/eigencuts/TestEigencutsSensitivityJob.java (added) +++ mahout/trunk/core/src/test/java/org/apache/mahout/clustering/spectral/eigencuts/TestEigencutsSensitivityJob.java Thu Sep 30 18:00:23 2010 @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.spectral.eigencuts; + +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.mahout.common.DummyRecordWriter; +import org.apache.mahout.common.MahoutTestCase; +import org.apache.mahout.math.DenseVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; +import org.junit.Test; + +/** + * <p>Tests the Eigencuts M/R process for generating perturbation sensitivities + * in a weighted graph.</p> + * + * <p>This process requires a lot of inputs. Please read the + * EigencutsSensitivityJob javadocs for more information on these variables. + * For now, + * + */ +public class TestEigencutsSensitivityJob extends MahoutTestCase { + + /* + private final double [][] affinity = { {0, 0.9748, 0.6926, 0.6065}, + {0.9748, 0, 0.7178, 0.6350}, + {0.6926, 0.7178, 0, 0.9898}, + {0.6065, 0.6350, 0.9898, 0} }; + */ + private final double [] diagonal = {2.2739, 2.3276, 2.4002, 2.2313}; + + private final double [][] eigenvectors = { {-0.4963, -0.5021, -0.5099, -0.4916}, + {-0.5143, -0.4841, 0.4519, 0.5449}, + {-0.6858, 0.7140, -0.1146, 0.0820}, + {0.1372, -0.0616, -0.7230, 0.6743} }; + private final double [] eigenvalues = {1.000, -0.1470, -0.4238, -0.4293}; + + /** + * This is the toughest step, primarily because of the intensity of + * the calculations that are performed and the amount of data required. + * Four parameters in particular - the list of eigenvalues, the + * vector representing the diagonal matrix, and the scalars beta0 and + * epsilon - must be set here prior to the start of the mapper. Once + * the mapper is executed, it iterates over a matrix of all corresponding + * eigenvectors. + * @throws Exception + */ +...@test +public void testEigencutsSensitivityMapper() throws Exception { + EigencutsSensitivityMapper mapper = new EigencutsSensitivityMapper(); + Configuration conf = new Configuration(); + + // construct the writers + DummyRecordWriter<IntWritable, EigencutsSensitivityNode> writer = + new DummyRecordWriter<IntWritable, EigencutsSensitivityNode>(); + Mapper<IntWritable, VectorWritable, IntWritable, EigencutsSensitivityNode>.Context + context = DummyRecordWriter.build(mapper, conf, writer); + mapper.setup(2.0, 0.25, new DenseVector(eigenvalues), new DenseVector(diagonal)); + + // perform the mapping + for (int i = 0; i < eigenvectors.length; i++) { + VectorWritable row = new VectorWritable(new DenseVector(eigenvectors[i])); + mapper.map(new IntWritable(i), row, context); + } + + // the results line up + for (IntWritable key : writer.getKeys()) { + List<EigencutsSensitivityNode> list = writer.getValue(key); + assertEquals("Only one result per row", 1, list.size()); + EigencutsSensitivityNode item = list.get(0); + assertTrue("Sensitivity values are correct", Math.abs(item.getSensitivity() + 0.48) < 0.01); + } + } + + /** + * This step will simply assemble sensitivities into one coherent matrix. + * @throws Exception + */ +...@test + public void testEigencutsSensitivityReducer() throws Exception { + EigencutsSensitivityMapper mapper = new EigencutsSensitivityMapper(); + Configuration conf = new Configuration(); + conf.setInt(EigencutsKeys.AFFINITY_DIMENSIONS, eigenvectors.length); + + // construct the writers + DummyRecordWriter<IntWritable, EigencutsSensitivityNode> mapWriter = + new DummyRecordWriter<IntWritable, EigencutsSensitivityNode>(); + Mapper<IntWritable, VectorWritable, IntWritable, EigencutsSensitivityNode>.Context + mapContext = DummyRecordWriter.build(mapper, conf, mapWriter); + mapper.setup(2.0, 0.25, new DenseVector(eigenvalues), new DenseVector(diagonal)); + + // perform the mapping + for (int i = 0; i < eigenvectors.length; i++) { + VectorWritable row = new VectorWritable(new DenseVector(eigenvectors[i])); + mapper.map(new IntWritable(i), row, mapContext); + } + + // set up the values for the reducer + conf.set(EigencutsKeys.DELTA, "1.0"); + conf.set(EigencutsKeys.TAU, "-0.1"); + + EigencutsSensitivityReducer reducer = new EigencutsSensitivityReducer(); + // set up the writers + DummyRecordWriter<IntWritable, VectorWritable> redWriter = new + DummyRecordWriter<IntWritable, VectorWritable>(); + Reducer<IntWritable, EigencutsSensitivityNode, IntWritable, VectorWritable>.Context + redContext = DummyRecordWriter.build(reducer, conf, redWriter, + IntWritable.class, EigencutsSensitivityNode.class); + + // perform the reduction + for (IntWritable key : mapWriter.getKeys()) { + reducer.reduce(key, mapWriter.getValue(key), redContext); + } + + // since all the sensitivities were below the threshold, + // each of them should have survived + for (IntWritable key : redWriter.getKeys()) { + List<VectorWritable> list = redWriter.getValue(key); + assertEquals("One item in the list", 1, list.size()); + Vector item = list.get(0).get(); + + // should only be one non-zero item + assertTrue("One non-zero item in the array", Math.abs(item.zSum() + 0.48) < 0.01); + } + + } +}
