http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/TestDistributedRowMatrix.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/TestDistributedRowMatrix.java b/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/TestDistributedRowMatrix.java new file mode 100644 index 0000000..13da38a --- /dev/null +++ b/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/TestDistributedRowMatrix.java @@ -0,0 +1,395 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.hadoop; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; +import org.apache.mahout.clustering.ClusteringTestUtils; +import org.apache.mahout.common.HadoopUtil; +import org.apache.mahout.common.MahoutTestCase; +import org.apache.mahout.common.iterator.sequencefile.PathFilters; +import org.apache.mahout.math.DenseVector; +import org.apache.mahout.math.Matrix; +import org.apache.mahout.math.MatrixSlice; +import org.apache.mahout.math.RandomAccessSparseVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorIterable; +import org.apache.mahout.math.VectorWritable; +import org.apache.mahout.math.decomposer.SolverTest; +import org.apache.mahout.math.function.Functions; +import org.junit.Test; + +import com.google.common.base.Function; +import com.google.common.collect.Iterators; +import com.google.common.collect.Maps; + +public final class TestDistributedRowMatrix extends MahoutTestCase { + public static final String TEST_PROPERTY_KEY = "test.property.key"; + public static final String TEST_PROPERTY_VALUE = "test.property.value"; + + private static void assertEquals(VectorIterable m, VectorIterable mtt, double errorTolerance) { + Iterator<MatrixSlice> mIt = m.iterateAll(); + Iterator<MatrixSlice> mttIt = mtt.iterateAll(); + Map<Integer, Vector> mMap = Maps.newHashMap(); + Map<Integer, Vector> mttMap = Maps.newHashMap(); + while (mIt.hasNext() && mttIt.hasNext()) { + MatrixSlice ms = mIt.next(); + mMap.put(ms.index(), ms.vector()); + MatrixSlice mtts = mttIt.next(); + mttMap.put(mtts.index(), mtts.vector()); + } + for (Map.Entry<Integer, Vector> entry : mMap.entrySet()) { + Integer key = entry.getKey(); + Vector value = entry.getValue(); + if (value == null || mttMap.get(key) == null) { + assertTrue(value == null || value.norm(2) == 0); + assertTrue(mttMap.get(key) == null || mttMap.get(key).norm(2) == 0); + } else { + assertTrue( + value.getDistanceSquared(mttMap.get(key)) < errorTolerance); + } + } + } + + @Test + public void testTranspose() throws Exception { + DistributedRowMatrix m = randomDistributedMatrix(10, 9, 5, 4, 1.0, false); + m.setConf(getConfiguration()); + DistributedRowMatrix mt = m.transpose(); + mt.setConf(getConfiguration()); + + Path tmpPath = getTestTempDirPath(); + m.setOutputTempPathString(tmpPath.toString()); + Path tmpOutPath = new Path(tmpPath, "/tmpOutTranspose"); + mt.setOutputTempPathString(tmpOutPath.toString()); + HadoopUtil.delete(getConfiguration(), tmpOutPath); + DistributedRowMatrix mtt = mt.transpose(); + assertEquals(m, mtt, EPSILON); + } + + @Test + public void testMatrixColumnMeansJob() throws Exception { + Matrix m = + SolverTest.randomSequentialAccessSparseMatrix(100, 90, 50, 20, 1.0); + DistributedRowMatrix dm = + randomDistributedMatrix(100, 90, 50, 20, 1.0, false); + dm.setConf(getConfiguration()); + + Vector expected = new DenseVector(50); + for (int i = 0; i < m.numRows(); i++) { + expected.assign(m.viewRow(i), Functions.PLUS); + } + expected.assign(Functions.DIV, m.numRows()); + Vector actual = dm.columnMeans("DenseVector"); + assertEquals(0.0, expected.getDistanceSquared(actual), EPSILON); + } + + @Test + public void testNullMatrixColumnMeansJob() throws Exception { + Matrix m = + SolverTest.randomSequentialAccessSparseMatrix(100, 90, 0, 0, 1.0); + DistributedRowMatrix dm = + randomDistributedMatrix(100, 90, 0, 0, 1.0, false); + dm.setConf(getConfiguration()); + + Vector expected = new DenseVector(0); + for (int i = 0; i < m.numRows(); i++) { + expected.assign(m.viewRow(i), Functions.PLUS); + } + expected.assign(Functions.DIV, m.numRows()); + Vector actual = dm.columnMeans(); + assertEquals(0.0, expected.getDistanceSquared(actual), EPSILON); + } + + @Test + public void testMatrixTimesVector() throws Exception { + Vector v = new RandomAccessSparseVector(50); + v.assign(1.0); + Matrix m = SolverTest.randomSequentialAccessSparseMatrix(100, 90, 50, 20, 1.0); + DistributedRowMatrix dm = randomDistributedMatrix(100, 90, 50, 20, 1.0, false); + dm.setConf(getConfiguration()); + + Vector expected = m.times(v); + Vector actual = dm.times(v); + assertEquals(0.0, expected.getDistanceSquared(actual), EPSILON); + } + + @Test + public void testMatrixTimesSquaredVector() throws Exception { + Vector v = new RandomAccessSparseVector(50); + v.assign(1.0); + Matrix m = SolverTest.randomSequentialAccessSparseMatrix(100, 90, 50, 20, 1.0); + DistributedRowMatrix dm = randomDistributedMatrix(100, 90, 50, 20, 1.0, false); + dm.setConf(getConfiguration()); + + Vector expected = m.timesSquared(v); + Vector actual = dm.timesSquared(v); + assertEquals(0.0, expected.getDistanceSquared(actual), 1.0e-9); + } + + @Test + public void testMatrixTimesMatrix() throws Exception { + Matrix inputA = SolverTest.randomSequentialAccessSparseMatrix(20, 19, 15, 5, 10.0); + Matrix inputB = SolverTest.randomSequentialAccessSparseMatrix(20, 13, 25, 10, 5.0); + Matrix expected = inputA.transpose().times(inputB); + + DistributedRowMatrix distA = randomDistributedMatrix(20, 19, 15, 5, 10.0, false, "distA"); + distA.setConf(getConfiguration()); + DistributedRowMatrix distB = randomDistributedMatrix(20, 13, 25, 10, 5.0, false, "distB"); + distB.setConf(getConfiguration()); + DistributedRowMatrix product = distA.times(distB); + + assertEquals(expected, product, EPSILON); + } + + @Test + public void testMatrixMultiplactionJobConfBuilder() throws Exception { + Configuration initialConf = createInitialConf(); + + Path baseTmpDirPath = getTestTempDirPath("testpaths"); + Path aPath = new Path(baseTmpDirPath, "a"); + Path bPath = new Path(baseTmpDirPath, "b"); + Path outPath = new Path(baseTmpDirPath, "out"); + + Configuration mmJobConf = MatrixMultiplicationJob.createMatrixMultiplyJobConf(aPath, bPath, outPath, 10); + Configuration mmCustomJobConf = MatrixMultiplicationJob.createMatrixMultiplyJobConf(initialConf, + aPath, + bPath, + outPath, + 10); + + assertNull(mmJobConf.get(TEST_PROPERTY_KEY)); + assertEquals(TEST_PROPERTY_VALUE, mmCustomJobConf.get(TEST_PROPERTY_KEY)); + } + + @Test + public void testTransposeJobConfBuilder() throws Exception { + Configuration initialConf = createInitialConf(); + + Path baseTmpDirPath = getTestTempDirPath("testpaths"); + Path inputPath = new Path(baseTmpDirPath, "input"); + Path outputPath = new Path(baseTmpDirPath, "output"); + + Configuration transposeJobConf = TransposeJob.buildTransposeJob(inputPath, outputPath, 10).getConfiguration(); + + Configuration transposeCustomJobConf = TransposeJob.buildTransposeJob(initialConf, inputPath, outputPath, 10) + .getConfiguration(); + + assertNull(transposeJobConf.get(TEST_PROPERTY_KEY)); + assertEquals(TEST_PROPERTY_VALUE, transposeCustomJobConf.get(TEST_PROPERTY_KEY)); + } + + @Test public void testTimesSquaredJobConfBuilders() throws Exception { + Configuration initialConf = createInitialConf(); + + Path baseTmpDirPath = getTestTempDirPath("testpaths"); + Path inputPath = new Path(baseTmpDirPath, "input"); + Path outputPath = new Path(baseTmpDirPath, "output"); + + Vector v = new RandomAccessSparseVector(50); + v.assign(1.0); + + Job timesSquaredJob1 = TimesSquaredJob.createTimesSquaredJob(v, inputPath, outputPath); + Job customTimesSquaredJob1 = TimesSquaredJob.createTimesSquaredJob(initialConf, v, inputPath, outputPath); + + assertNull(timesSquaredJob1.getConfiguration().get(TEST_PROPERTY_KEY)); + assertEquals(TEST_PROPERTY_VALUE, customTimesSquaredJob1.getConfiguration().get(TEST_PROPERTY_KEY)); + + Job timesJob = TimesSquaredJob.createTimesJob(v, 50, inputPath, outputPath); + Job customTimesJob = TimesSquaredJob.createTimesJob(initialConf, v, 50, inputPath, outputPath); + + assertNull(timesJob.getConfiguration().get(TEST_PROPERTY_KEY)); + assertEquals(TEST_PROPERTY_VALUE, customTimesJob.getConfiguration().get(TEST_PROPERTY_KEY)); + + Job timesSquaredJob2 = TimesSquaredJob.createTimesSquaredJob(v, inputPath, outputPath, + TimesSquaredJob.TimesSquaredMapper.class, TimesSquaredJob.VectorSummingReducer.class); + + Job customTimesSquaredJob2 = TimesSquaredJob.createTimesSquaredJob(initialConf, v, inputPath, + outputPath, TimesSquaredJob.TimesSquaredMapper.class, TimesSquaredJob.VectorSummingReducer.class); + + assertNull(timesSquaredJob2.getConfiguration().get(TEST_PROPERTY_KEY)); + assertEquals(TEST_PROPERTY_VALUE, customTimesSquaredJob2.getConfiguration().get(TEST_PROPERTY_KEY)); + + Job timesSquaredJob3 = TimesSquaredJob.createTimesSquaredJob(v, 50, inputPath, outputPath, + TimesSquaredJob.TimesSquaredMapper.class, TimesSquaredJob.VectorSummingReducer.class); + + Job customTimesSquaredJob3 = TimesSquaredJob.createTimesSquaredJob(initialConf, + v, 50, inputPath, outputPath, TimesSquaredJob.TimesSquaredMapper.class, + TimesSquaredJob.VectorSummingReducer.class); + + assertNull(timesSquaredJob3.getConfiguration().get(TEST_PROPERTY_KEY)); + assertEquals(TEST_PROPERTY_VALUE, customTimesSquaredJob3.getConfiguration().get(TEST_PROPERTY_KEY)); + } + + @Test + public void testTimesVectorTempDirDeletion() throws Exception { + Configuration conf = getConfiguration(); + Vector v = new RandomAccessSparseVector(50); + v.assign(1.0); + DistributedRowMatrix dm = randomDistributedMatrix(100, 90, 50, 20, 1.0, false); + dm.setConf(conf); + + Path outputPath = dm.getOutputTempPath(); + FileSystem fs = outputPath.getFileSystem(conf); + + deleteContentsOfPath(conf, outputPath); + + assertEquals(0, HadoopUtil.listStatus(fs, outputPath).length); + + Vector result1 = dm.times(v); + + assertEquals(0, HadoopUtil.listStatus(fs, outputPath).length); + + deleteContentsOfPath(conf, outputPath); + assertEquals(0, HadoopUtil.listStatus(fs, outputPath).length); + + conf.setBoolean(DistributedRowMatrix.KEEP_TEMP_FILES, true); + dm.setConf(conf); + + Vector result2 = dm.times(v); + + FileStatus[] outputStatuses = fs.listStatus(outputPath); + assertEquals(1, outputStatuses.length); + Path outputTempPath = outputStatuses[0].getPath(); + Path inputVectorPath = new Path(outputTempPath, TimesSquaredJob.INPUT_VECTOR); + Path outputVectorPath = new Path(outputTempPath, TimesSquaredJob.OUTPUT_VECTOR_FILENAME); + assertEquals(1, fs.listStatus(inputVectorPath, PathFilters.logsCRCFilter()).length); + assertEquals(1, fs.listStatus(outputVectorPath, PathFilters.logsCRCFilter()).length); + + assertEquals(0.0, result1.getDistanceSquared(result2), EPSILON); + } + + @Test + public void testTimesSquaredVectorTempDirDeletion() throws Exception { + Configuration conf = getConfiguration(); + Vector v = new RandomAccessSparseVector(50); + v.assign(1.0); + DistributedRowMatrix dm = randomDistributedMatrix(100, 90, 50, 20, 1.0, false); + dm.setConf(getConfiguration()); + + Path outputPath = dm.getOutputTempPath(); + FileSystem fs = outputPath.getFileSystem(conf); + + deleteContentsOfPath(conf, outputPath); + + assertEquals(0, HadoopUtil.listStatus(fs, outputPath).length); + + Vector result1 = dm.timesSquared(v); + + assertEquals(0, HadoopUtil.listStatus(fs, outputPath).length); + + deleteContentsOfPath(conf, outputPath); + assertEquals(0, HadoopUtil.listStatus(fs, outputPath).length); + + conf.setBoolean(DistributedRowMatrix.KEEP_TEMP_FILES, true); + dm.setConf(conf); + + Vector result2 = dm.timesSquared(v); + + FileStatus[] outputStatuses = fs.listStatus(outputPath); + assertEquals(1, outputStatuses.length); + Path outputTempPath = outputStatuses[0].getPath(); + Path inputVectorPath = new Path(outputTempPath, TimesSquaredJob.INPUT_VECTOR); + Path outputVectorPath = new Path(outputTempPath, TimesSquaredJob.OUTPUT_VECTOR_FILENAME); + assertEquals(1, fs.listStatus(inputVectorPath, PathFilters.logsCRCFilter()).length); + assertEquals(1, fs.listStatus(outputVectorPath, PathFilters.logsCRCFilter()).length); + + assertEquals(0.0, result1.getDistanceSquared(result2), EPSILON); + } + + public Configuration createInitialConf() throws IOException { + Configuration initialConf = getConfiguration(); + initialConf.set(TEST_PROPERTY_KEY, TEST_PROPERTY_VALUE); + return initialConf; + } + + private static void deleteContentsOfPath(Configuration conf, Path path) throws Exception { + FileSystem fs = path.getFileSystem(conf); + + FileStatus[] statuses = HadoopUtil.listStatus(fs, path); + for (FileStatus status : statuses) { + fs.delete(status.getPath(), true); + } + } + + public DistributedRowMatrix randomDistributedMatrix(int numRows, + int nonNullRows, + int numCols, + int entriesPerRow, + double entryMean, + boolean isSymmetric) throws IOException { + return randomDistributedMatrix(numRows, nonNullRows, numCols, entriesPerRow, entryMean, isSymmetric, "testdata"); + } + + public DistributedRowMatrix randomDenseHierarchicalDistributedMatrix(int numRows, + int numCols, + boolean isSymmetric, + String baseTmpDirSuffix) + throws IOException { + Path baseTmpDirPath = getTestTempDirPath(baseTmpDirSuffix); + Matrix c = SolverTest.randomHierarchicalMatrix(numRows, numCols, isSymmetric); + return saveToFs(c, baseTmpDirPath); + } + + public DistributedRowMatrix randomDistributedMatrix(int numRows, + int nonNullRows, + int numCols, + int entriesPerRow, + double entryMean, + boolean isSymmetric, + String baseTmpDirSuffix) throws IOException { + Path baseTmpDirPath = getTestTempDirPath(baseTmpDirSuffix); + Matrix c = SolverTest.randomSequentialAccessSparseMatrix(numRows, nonNullRows, numCols, entriesPerRow, entryMean); + if (isSymmetric) { + c = c.times(c.transpose()); + } + return saveToFs(c, baseTmpDirPath); + } + + private DistributedRowMatrix saveToFs(final Matrix m, Path baseTmpDirPath) throws IOException { + Configuration conf = getConfiguration(); + FileSystem fs = FileSystem.get(baseTmpDirPath.toUri(), conf); + + ClusteringTestUtils.writePointsToFile(new Iterable<VectorWritable>() { + @Override + public Iterator<VectorWritable> iterator() { + return Iterators.transform(m.iterator(), new Function<MatrixSlice,VectorWritable>() { + @Override + public VectorWritable apply(MatrixSlice input) { + return new VectorWritable(input.vector()); + } + }); + } + }, true, new Path(baseTmpDirPath, "distMatrix/part-00000"), fs, conf); + + DistributedRowMatrix distMatrix = new DistributedRowMatrix(new Path(baseTmpDirPath, "distMatrix"), + new Path(baseTmpDirPath, "tmpOut"), + m.numRows(), + m.numCols()); + distMatrix.setConf(new Configuration(conf)); + + return distMatrix; + } +}
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/decomposer/TestDistributedLanczosSolver.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/decomposer/TestDistributedLanczosSolver.java b/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/decomposer/TestDistributedLanczosSolver.java new file mode 100644 index 0000000..ac01c28 --- /dev/null +++ b/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/decomposer/TestDistributedLanczosSolver.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.hadoop.decomposer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.mahout.common.MahoutTestCase; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.decomposer.SolverTest; +import org.apache.mahout.math.decomposer.lanczos.LanczosState; +import org.apache.mahout.math.hadoop.DistributedRowMatrix; +import org.apache.mahout.math.hadoop.TestDistributedRowMatrix; +import org.junit.Before; + +import java.io.File; +import java.io.IOException; + +@Deprecated +public final class TestDistributedLanczosSolver extends MahoutTestCase { + + private int counter = 0; + private DistributedRowMatrix symCorpus; + private DistributedRowMatrix asymCorpus; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + File symTestData = getTestTempDir("symTestData"); + File asymTestData = getTestTempDir("asymTestData"); + symCorpus = new TestDistributedRowMatrix().randomDistributedMatrix(100, + 90, 80, 2, 10.0, true, symTestData.getAbsolutePath()); + asymCorpus = new TestDistributedRowMatrix().randomDistributedMatrix(100, + 90, 80, 2, 10.0, false, asymTestData.getAbsolutePath()); + } + + private static String suf(boolean symmetric) { + return symmetric ? "_sym" : "_asym"; + } + + private DistributedRowMatrix getCorpus(boolean symmetric) { + return symmetric ? symCorpus : asymCorpus; + } + + /* + private LanczosState doTestDistributedLanczosSolver(boolean symmetric, + int desiredRank) throws IOException { + return doTestDistributedLanczosSolver(symmetric, desiredRank, true); + } + */ + + private LanczosState doTestDistributedLanczosSolver(boolean symmetric, + int desiredRank, boolean hdfsBackedState) + throws IOException { + DistributedRowMatrix corpus = getCorpus(symmetric); + Configuration conf = getConfiguration(); + corpus.setConf(conf); + DistributedLanczosSolver solver = new DistributedLanczosSolver(); + Vector intitialVector = DistributedLanczosSolver.getInitialVector(corpus); + LanczosState state; + if (hdfsBackedState) { + HdfsBackedLanczosState hState = new HdfsBackedLanczosState(corpus, + desiredRank, intitialVector, new Path(getTestTempDirPath(), + "lanczosStateDir" + suf(symmetric) + counter)); + hState.setConf(conf); + state = hState; + } else { + state = new LanczosState(corpus, desiredRank, intitialVector); + } + solver.solve(state, desiredRank, symmetric); + SolverTest.assertOrthonormal(state); + for (int i = 0; i < desiredRank/2; i++) { + SolverTest.assertEigen(i, state.getRightSingularVector(i), corpus, 0.1, symmetric); + } + counter++; + return state; + } + + public void doTestResumeIteration(boolean symmetric) throws IOException { + DistributedRowMatrix corpus = getCorpus(symmetric); + Configuration conf = getConfiguration(); + corpus.setConf(conf); + DistributedLanczosSolver solver = new DistributedLanczosSolver(); + int rank = 10; + Vector intitialVector = DistributedLanczosSolver.getInitialVector(corpus); + HdfsBackedLanczosState state = new HdfsBackedLanczosState(corpus, rank, + intitialVector, new Path(getTestTempDirPath(), "lanczosStateDir" + suf(symmetric) + counter)); + solver.solve(state, rank, symmetric); + + rank *= 2; + state = new HdfsBackedLanczosState(corpus, rank, + intitialVector, new Path(getTestTempDirPath(), "lanczosStateDir" + suf(symmetric) + counter)); + solver = new DistributedLanczosSolver(); + solver.solve(state, rank, symmetric); + + LanczosState allAtOnceState = doTestDistributedLanczosSolver(symmetric, rank, false); + for (int i=0; i<state.getIterationNumber(); i++) { + Vector v = state.getBasisVector(i).normalize(); + Vector w = allAtOnceState.getBasisVector(i).normalize(); + double diff = v.minus(w).norm(2); + assertTrue("basis " + i + " is too long: " + diff, diff < 0.1); + } + counter++; + } + + // TODO when this can be made to run in under 20 minutes, re-enable + /* + @Test + public void testDistributedLanczosSolver() throws Exception { + doTestDistributedLanczosSolver(true, 30); + doTestDistributedLanczosSolver(false, 30); + doTestResumeIteration(true); + doTestResumeIteration(false); + } + */ + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/decomposer/TestDistributedLanczosSolverCLI.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/decomposer/TestDistributedLanczosSolverCLI.java b/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/decomposer/TestDistributedLanczosSolverCLI.java new file mode 100644 index 0000000..5dfb328 --- /dev/null +++ b/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/decomposer/TestDistributedLanczosSolverCLI.java @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.hadoop.decomposer; + +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.ToolRunner; +import org.apache.mahout.common.MahoutTestCase; +import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterable; +import org.apache.mahout.math.DenseMatrix; +import org.apache.mahout.math.Matrix; +import org.apache.mahout.math.NamedVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; +import org.apache.mahout.math.hadoop.DistributedRowMatrix; +import org.apache.mahout.math.hadoop.TestDistributedRowMatrix; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Arrays; + +@Deprecated +public final class TestDistributedLanczosSolverCLI extends MahoutTestCase { + private static final Logger log = LoggerFactory.getLogger(TestDistributedLanczosSolverCLI.class); + + @Test + public void testDistributedLanczosSolverCLI() throws Exception { + Path testData = getTestTempDirPath("testdata"); + DistributedRowMatrix corpus = + new TestDistributedRowMatrix().randomDenseHierarchicalDistributedMatrix(10, 9, false, + testData.toString()); + corpus.setConf(getConfiguration()); + Path output = getTestTempDirPath("output"); + Path tmp = getTestTempDirPath("tmp"); + Path workingDir = getTestTempDirPath("working"); + String[] args = { + "-i", new Path(testData, "distMatrix").toString(), + "-o", output.toString(), + "--tempDir", tmp.toString(), + "--numRows", "10", + "--numCols", "9", + "--rank", "6", + "--symmetric", "false", + "--workingDir", workingDir.toString() + }; + ToolRunner.run(getConfiguration(), new DistributedLanczosSolver().new DistributedLanczosSolverJob(), args); + + output = getTestTempDirPath("output2"); + tmp = getTestTempDirPath("tmp2"); + args = new String[] { + "-i", new Path(testData, "distMatrix").toString(), + "-o", output.toString(), + "--tempDir", tmp.toString(), + "--numRows", "10", + "--numCols", "9", + "--rank", "7", + "--symmetric", "false", + "--workingDir", workingDir.toString() + }; + ToolRunner.run(getConfiguration(), new DistributedLanczosSolver().new DistributedLanczosSolverJob(), args); + + Path rawEigenvectors = new Path(output, DistributedLanczosSolver.RAW_EIGENVECTORS); + Matrix eigenVectors = new DenseMatrix(7, corpus.numCols()); + Configuration conf = getConfiguration(); + + int i = 0; + for (VectorWritable value : new SequenceFileValueIterable<VectorWritable>(rawEigenvectors, conf)) { + Vector v = value.get(); + eigenVectors.assignRow(i, v); + i++; + } + assertEquals("number of eigenvectors", 7, i); + } + + @Test + public void testDistributedLanczosSolverEVJCLI() throws Exception { + Path testData = getTestTempDirPath("testdata"); + DistributedRowMatrix corpus = new TestDistributedRowMatrix() + .randomDenseHierarchicalDistributedMatrix(10, 9, false, testData.toString()); + corpus.setConf(getConfiguration()); + Path output = getTestTempDirPath("output"); + Path tmp = getTestTempDirPath("tmp"); + String[] args = { + "-i", new Path(testData, "distMatrix").toString(), + "-o", output.toString(), + "--tempDir", tmp.toString(), + "--numRows", "10", + "--numCols", "9", + "--rank", "6", + "--symmetric", "false", + "--cleansvd", "true" + }; + ToolRunner.run(getConfiguration(), new DistributedLanczosSolver().new DistributedLanczosSolverJob(), args); + + Path cleanEigenvectors = new Path(output, EigenVerificationJob.CLEAN_EIGENVECTORS); + Matrix eigenVectors = new DenseMatrix(6, corpus.numCols()); + Collection<Double> eigenvalues = Lists.newArrayList(); + + output = getTestTempDirPath("output2"); + tmp = getTestTempDirPath("tmp2"); + args = new String[] { + "-i", new Path(testData, "distMatrix").toString(), + "-o", output.toString(), + "--tempDir", tmp.toString(), + "--numRows", "10", + "--numCols", "9", + "--rank", "7", + "--symmetric", "false", + "--cleansvd", "true" + }; + ToolRunner.run(getConfiguration(), new DistributedLanczosSolver().new DistributedLanczosSolverJob(), args); + Path cleanEigenvectors2 = new Path(output, EigenVerificationJob.CLEAN_EIGENVECTORS); + Matrix eigenVectors2 = new DenseMatrix(7, corpus.numCols()); + Configuration conf = getConfiguration(); + Collection<Double> newEigenValues = Lists.newArrayList(); + + int i = 0; + for (VectorWritable value : new SequenceFileValueIterable<VectorWritable>(cleanEigenvectors, conf)) { + NamedVector v = (NamedVector) value.get(); + eigenVectors.assignRow(i, v); + log.info(v.getName()); + if (EigenVector.getCosAngleError(v.getName()) < 1.0e-3) { + eigenvalues.add(EigenVector.getEigenValue(v.getName())); + } + i++; + } + assertEquals("number of clean eigenvectors", 3, i); + + i = 0; + for (VectorWritable value : new SequenceFileValueIterable<VectorWritable>(cleanEigenvectors2, conf)) { + NamedVector v = (NamedVector) value.get(); + log.info(v.getName()); + eigenVectors2.assignRow(i, v); + newEigenValues.add(EigenVector.getEigenValue(v.getName())); + i++; + } + + Collection<Integer> oldEigensFound = Lists.newArrayList(); + for (int row = 0; row < eigenVectors.numRows(); row++) { + Vector oldEigen = eigenVectors.viewRow(row); + if (oldEigen == null) { + break; + } + for (int newRow = 0; newRow < eigenVectors2.numRows(); newRow++) { + Vector newEigen = eigenVectors2.viewRow(newRow); + if (newEigen != null && oldEigen.dot(newEigen) > 0.9) { + oldEigensFound.add(row); + break; + } + } + } + assertEquals("the number of new eigenvectors", 5, i); + + Collection<Double> oldEigenValuesNotFound = Lists.newArrayList(); + for (double d : eigenvalues) { + boolean found = false; + for (double newD : newEigenValues) { + if (Math.abs((d - newD)/d) < 0.1) { + found = true; + } + } + if (!found) { + oldEigenValuesNotFound.add(d); + } + } + assertEquals("number of old eigenvalues not found: " + + Arrays.toString(oldEigenValuesNotFound.toArray(new Double[oldEigenValuesNotFound.size()])), + 0, oldEigenValuesNotFound.size()); + assertEquals("did not find enough old eigenvectors", 3, oldEigensFound.size()); + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/similarity/TestVectorDistanceSimilarityJob.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/similarity/TestVectorDistanceSimilarityJob.java b/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/similarity/TestVectorDistanceSimilarityJob.java new file mode 100644 index 0000000..a8a861c --- /dev/null +++ b/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/similarity/TestVectorDistanceSimilarityJob.java @@ -0,0 +1,238 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.hadoop.similarity; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.util.ToolRunner; +import org.apache.mahout.clustering.ClusteringTestUtils; +import org.apache.mahout.common.DummyOutputCollector; +import org.apache.mahout.common.MahoutTestCase; +import org.apache.mahout.common.Pair; +import org.apache.mahout.common.StringTuple; +import org.apache.mahout.common.commandline.DefaultOptionCreator; +import org.apache.mahout.common.distance.EuclideanDistanceMeasure; +import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable; +import org.apache.mahout.math.DenseVector; +import org.apache.mahout.math.NamedVector; +import org.apache.mahout.math.RandomAccessSparseVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; +import org.easymock.EasyMock; +import org.junit.Before; +import org.junit.Test; + +import java.util.Collection; +import java.util.List; +import java.util.Map; + +public class TestVectorDistanceSimilarityJob extends MahoutTestCase { + + private FileSystem fs; + + private static final double[][] REFERENCE = { { 1, 1 }, { 2, 1 }, { 1, 2 }, { 2, 2 }, { 3, 3 }, { 4, 4 }, { 5, 4 }, + { 4, 5 }, { 5, 5 } }; + + private static final double[][] SEEDS = { { 1, 1 }, { 10, 10 } }; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + fs = FileSystem.get(getConfiguration()); + } + + @Test + public void testVectorDistanceMapper() throws Exception { + Mapper<WritableComparable<?>, VectorWritable, StringTuple, DoubleWritable>.Context context = + EasyMock.createMock(Mapper.Context.class); + StringTuple tuple = new StringTuple(); + tuple.add("foo"); + tuple.add("123"); + context.write(tuple, new DoubleWritable(Math.sqrt(2.0))); + tuple = new StringTuple(); + tuple.add("foo2"); + tuple.add("123"); + context.write(tuple, new DoubleWritable(1)); + + EasyMock.replay(context); + + Vector vector = new RandomAccessSparseVector(2); + vector.set(0, 2); + vector.set(1, 2); + + VectorDistanceMapper mapper = new VectorDistanceMapper(); + setField(mapper, "measure", new EuclideanDistanceMeasure()); + Collection<NamedVector> seedVectors = Lists.newArrayList(); + Vector seed1 = new RandomAccessSparseVector(2); + seed1.set(0, 1); + seed1.set(1, 1); + Vector seed2 = new RandomAccessSparseVector(2); + seed2.set(0, 2); + seed2.set(1, 1); + + seedVectors.add(new NamedVector(seed1, "foo")); + seedVectors.add(new NamedVector(seed2, "foo2")); + setField(mapper, "seedVectors", seedVectors); + + mapper.map(new IntWritable(123), new VectorWritable(vector), context); + + EasyMock.verify(context); + } + + @Test + public void testVectorDistanceInvertedMapper() throws Exception { + Mapper<WritableComparable<?>, VectorWritable, Text, VectorWritable>.Context context = + EasyMock.createMock(Mapper.Context.class); + Vector expectVec = new DenseVector(new double[]{Math.sqrt(2.0), 1.0}); + context.write(new Text("other"), new VectorWritable(expectVec)); + EasyMock.replay(context); + Vector vector = new NamedVector(new RandomAccessSparseVector(2), "other"); + vector.set(0, 2); + vector.set(1, 2); + + VectorDistanceInvertedMapper mapper = new VectorDistanceInvertedMapper(); + setField(mapper, "measure", new EuclideanDistanceMeasure()); + Collection<NamedVector> seedVectors = Lists.newArrayList(); + Vector seed1 = new RandomAccessSparseVector(2); + seed1.set(0, 1); + seed1.set(1, 1); + Vector seed2 = new RandomAccessSparseVector(2); + seed2.set(0, 2); + seed2.set(1, 1); + + seedVectors.add(new NamedVector(seed1, "foo")); + seedVectors.add(new NamedVector(seed2, "foo2")); + setField(mapper, "seedVectors", seedVectors); + + mapper.map(new IntWritable(123), new VectorWritable(vector), context); + + EasyMock.verify(context); + + } + + @Test + public void testRun() throws Exception { + Path input = getTestTempDirPath("input"); + Path output = getTestTempDirPath("output"); + Path seedsPath = getTestTempDirPath("seeds"); + + List<VectorWritable> points = getPointsWritable(REFERENCE); + List<VectorWritable> seeds = getPointsWritable(SEEDS); + + Configuration conf = getConfiguration(); + ClusteringTestUtils.writePointsToFile(points, true, new Path(input, "file1"), fs, conf); + ClusteringTestUtils.writePointsToFile(seeds, true, new Path(seedsPath, "part-seeds"), fs, conf); + + String[] args = { optKey(DefaultOptionCreator.INPUT_OPTION), input.toString(), + optKey(VectorDistanceSimilarityJob.SEEDS), seedsPath.toString(), optKey(DefaultOptionCreator.OUTPUT_OPTION), + output.toString(), optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION), + EuclideanDistanceMeasure.class.getName() }; + + ToolRunner.run(getConfiguration(), new VectorDistanceSimilarityJob(), args); + + int expectedOutputSize = SEEDS.length * REFERENCE.length; + int outputSize = Iterables.size(new SequenceFileIterable<StringTuple, DoubleWritable>(new Path(output, + "part-m-00000"), conf)); + assertEquals(expectedOutputSize, outputSize); + } + + @Test + public void testMaxDistance() throws Exception { + + Path input = getTestTempDirPath("input"); + Path output = getTestTempDirPath("output"); + Path seedsPath = getTestTempDirPath("seeds"); + + List<VectorWritable> points = getPointsWritable(REFERENCE); + List<VectorWritable> seeds = getPointsWritable(SEEDS); + + Configuration conf = getConfiguration(); + ClusteringTestUtils.writePointsToFile(points, true, new Path(input, "file1"), fs, conf); + ClusteringTestUtils.writePointsToFile(seeds, true, new Path(seedsPath, "part-seeds"), fs, conf); + + double maxDistance = 10; + + String[] args = { optKey(DefaultOptionCreator.INPUT_OPTION), input.toString(), + optKey(VectorDistanceSimilarityJob.SEEDS), seedsPath.toString(), optKey(DefaultOptionCreator.OUTPUT_OPTION), + output.toString(), optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION), + EuclideanDistanceMeasure.class.getName(), + optKey(VectorDistanceSimilarityJob.MAX_DISTANCE), String.valueOf(maxDistance) }; + + ToolRunner.run(getConfiguration(), new VectorDistanceSimilarityJob(), args); + + int outputSize = 0; + + for (Pair<StringTuple, DoubleWritable> record : new SequenceFileIterable<StringTuple, DoubleWritable>( + new Path(output, "part-m-00000"), conf)) { + outputSize++; + assertTrue(record.getSecond().get() <= maxDistance); + } + + assertEquals(14, outputSize); + } + + @Test + public void testRunInverted() throws Exception { + Path input = getTestTempDirPath("input"); + Path output = getTestTempDirPath("output"); + Path seedsPath = getTestTempDirPath("seeds"); + List<VectorWritable> points = getPointsWritable(REFERENCE); + List<VectorWritable> seeds = getPointsWritable(SEEDS); + Configuration conf = getConfiguration(); + ClusteringTestUtils.writePointsToFile(points, true, new Path(input, "file1"), fs, conf); + ClusteringTestUtils.writePointsToFile(seeds, true, new Path(seedsPath, "part-seeds"), fs, conf); + String[] args = {optKey(DefaultOptionCreator.INPUT_OPTION), input.toString(), + optKey(VectorDistanceSimilarityJob.SEEDS), seedsPath.toString(), optKey(DefaultOptionCreator.OUTPUT_OPTION), + output.toString(), optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION), + EuclideanDistanceMeasure.class.getName(), + optKey(VectorDistanceSimilarityJob.OUT_TYPE_KEY), "v" + }; + ToolRunner.run(getConfiguration(), new VectorDistanceSimilarityJob(), args); + + DummyOutputCollector<Text, VectorWritable> collector = new DummyOutputCollector<>(); + + for (Pair<Text, VectorWritable> record : new SequenceFileIterable<Text, VectorWritable>( + new Path(output, "part-m-00000"), conf)) { + collector.collect(record.getFirst(), record.getSecond()); + } + assertEquals(REFERENCE.length, collector.getData().size()); + for (Map.Entry<Text, List<VectorWritable>> entry : collector.getData().entrySet()) { + assertEquals(SEEDS.length, entry.getValue().iterator().next().get().size()); + } + } + + private static List<VectorWritable> getPointsWritable(double[][] raw) { + List<VectorWritable> points = Lists.newArrayList(); + for (double[] fr : raw) { + Vector vec = new RandomAccessSparseVector(fr.length); + vec.assign(fr); + points.add(new VectorWritable(vec)); + } + return points; + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/RowSimilarityJobTest.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/RowSimilarityJobTest.java b/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/RowSimilarityJobTest.java new file mode 100644 index 0000000..5d64f90 --- /dev/null +++ b/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/RowSimilarityJobTest.java @@ -0,0 +1,214 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.hadoop.similarity.cooccurrence; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.mahout.common.MahoutTestCase; +import org.apache.mahout.math.Matrix; +import org.apache.mahout.math.hadoop.MathHelper; +import org.apache.mahout.math.hadoop.similarity.cooccurrence.measures.TanimotoCoefficientSimilarity; +import org.apache.mahout.math.map.OpenIntIntHashMap; +import org.junit.Test; + +import java.io.File; + +public class RowSimilarityJobTest extends MahoutTestCase { + + /** + * integration test with a tiny data set + * + * <pre> + * + * input matrix: + * + * 1, 0, 1, 1, 0 + * 0, 0, 1, 1, 0 + * 0, 0, 0, 0, 1 + * + * similarity matrix (via tanimoto): + * + * 1, 0.666, 0 + * 0.666, 1, 0 + * 0, 0, 1 + * </pre> + * @throws Exception + */ + @Test + public void toyIntegration() throws Exception { + + File inputFile = getTestTempFile("rows"); + File outputDir = getTestTempDir("output"); + outputDir.delete(); + File tmpDir = getTestTempDir("tmp"); + + Configuration conf = getConfiguration(); + Path inputPath = new Path(inputFile.getAbsolutePath()); + FileSystem fs = FileSystem.get(inputPath.toUri(), conf); + + MathHelper.writeDistributedRowMatrix(new double[][] { + new double[] { 1, 0, 1, 1, 0 }, + new double[] { 0, 0, 1, 1, 0 }, + new double[] { 0, 0, 0, 0, 1 } }, + fs, conf, inputPath); + + RowSimilarityJob rowSimilarityJob = new RowSimilarityJob(); + rowSimilarityJob.setConf(conf); + rowSimilarityJob.run(new String[] { "--input", inputFile.getAbsolutePath(), "--output", outputDir.getAbsolutePath(), + "--numberOfColumns", String.valueOf(5), "--similarityClassname", TanimotoCoefficientSimilarity.class.getName(), + "--tempDir", tmpDir.getAbsolutePath() }); + + + OpenIntIntHashMap observationsPerColumn = + Vectors.readAsIntMap(new Path(tmpDir.getAbsolutePath(), "observationsPerColumn.bin"), conf); + assertEquals(4, observationsPerColumn.size()); + assertEquals(1, observationsPerColumn.get(0)); + assertEquals(2, observationsPerColumn.get(2)); + assertEquals(2, observationsPerColumn.get(3)); + assertEquals(1, observationsPerColumn.get(4)); + + Matrix similarityMatrix = MathHelper.readMatrix(conf, new Path(outputDir.getAbsolutePath(), "part-r-00000"), 3, 3); + + assertNotNull(similarityMatrix); + assertEquals(3, similarityMatrix.numCols()); + assertEquals(3, similarityMatrix.numRows()); + + assertEquals(1.0, similarityMatrix.get(0, 0), EPSILON); + assertEquals(1.0, similarityMatrix.get(1, 1), EPSILON); + assertEquals(1.0, similarityMatrix.get(2, 2), EPSILON); + assertEquals(0.0, similarityMatrix.get(2, 0), EPSILON); + assertEquals(0.0, similarityMatrix.get(2, 1), EPSILON); + assertEquals(0.0, similarityMatrix.get(0, 2), EPSILON); + assertEquals(0.0, similarityMatrix.get(1, 2), EPSILON); + assertEquals(0.666666, similarityMatrix.get(0, 1), EPSILON); + assertEquals(0.666666, similarityMatrix.get(1, 0), EPSILON); + } + + @Test + public void toyIntegrationMaxSimilaritiesPerRow() throws Exception { + + File inputFile = getTestTempFile("rows"); + File outputDir = getTestTempDir("output"); + outputDir.delete(); + File tmpDir = getTestTempDir("tmp"); + + Configuration conf = getConfiguration(); + Path inputPath = new Path(inputFile.getAbsolutePath()); + FileSystem fs = FileSystem.get(inputPath.toUri(), conf); + + MathHelper.writeDistributedRowMatrix(new double[][]{ + new double[] { 1, 0, 1, 1, 0, 1 }, + new double[] { 0, 1, 1, 1, 1, 1 }, + new double[] { 1, 1, 0, 1, 0, 0 } }, + fs, conf, inputPath); + + RowSimilarityJob rowSimilarityJob = new RowSimilarityJob(); + rowSimilarityJob.setConf(conf); + rowSimilarityJob.run(new String[] { "--input", inputFile.getAbsolutePath(), "--output", outputDir.getAbsolutePath(), + "--numberOfColumns", String.valueOf(6), "--similarityClassname", TanimotoCoefficientSimilarity.class.getName(), + "--maxSimilaritiesPerRow", String.valueOf(1), "--excludeSelfSimilarity", String.valueOf(true), + "--tempDir", tmpDir.getAbsolutePath() }); + + Matrix similarityMatrix = MathHelper.readMatrix(conf, new Path(outputDir.getAbsolutePath(), "part-r-00000"), 3, 3); + + assertNotNull(similarityMatrix); + assertEquals(3, similarityMatrix.numCols()); + assertEquals(3, similarityMatrix.numRows()); + + assertEquals(0.0, similarityMatrix.get(0, 0), EPSILON); + assertEquals(0.5, similarityMatrix.get(0, 1), EPSILON); + assertEquals(0.0, similarityMatrix.get(0, 2), EPSILON); + + assertEquals(0.5, similarityMatrix.get(1, 0), EPSILON); + assertEquals(0.0, similarityMatrix.get(1, 1), EPSILON); + assertEquals(0.0, similarityMatrix.get(1, 2), EPSILON); + + assertEquals(0.4, similarityMatrix.get(2, 0), EPSILON); + assertEquals(0.0, similarityMatrix.get(2, 1), EPSILON); + assertEquals(0.0, similarityMatrix.get(2, 2), EPSILON); + } + + @Test + public void toyIntegrationWithThreshold() throws Exception { + + + File inputFile = getTestTempFile("rows"); + File outputDir = getTestTempDir("output"); + outputDir.delete(); + File tmpDir = getTestTempDir("tmp"); + + Configuration conf = getConfiguration(); + Path inputPath = new Path(inputFile.getAbsolutePath()); + FileSystem fs = FileSystem.get(inputPath.toUri(), conf); + + MathHelper.writeDistributedRowMatrix(new double[][]{ + new double[] { 1, 0, 1, 1, 0, 1 }, + new double[] { 0, 1, 1, 1, 1, 1 }, + new double[] { 1, 1, 0, 1, 0, 0 } }, + fs, conf, inputPath); + + RowSimilarityJob rowSimilarityJob = new RowSimilarityJob(); + rowSimilarityJob.setConf(conf); + rowSimilarityJob.run(new String[] { "--input", inputFile.getAbsolutePath(), "--output", outputDir.getAbsolutePath(), + "--numberOfColumns", String.valueOf(6), "--similarityClassname", TanimotoCoefficientSimilarity.class.getName(), + "--excludeSelfSimilarity", String.valueOf(true), "--threshold", String.valueOf(0.5), + "--tempDir", tmpDir.getAbsolutePath() }); + + Matrix similarityMatrix = MathHelper.readMatrix(conf, new Path(outputDir.getAbsolutePath(), "part-r-00000"), 3, 3); + + assertNotNull(similarityMatrix); + assertEquals(3, similarityMatrix.numCols()); + assertEquals(3, similarityMatrix.numRows()); + + assertEquals(0.0, similarityMatrix.get(0, 0), EPSILON); + assertEquals(0.5, similarityMatrix.get(0, 1), EPSILON); + assertEquals(0.0, similarityMatrix.get(0, 2), EPSILON); + + assertEquals(0.5, similarityMatrix.get(1, 0), EPSILON); + assertEquals(0.0, similarityMatrix.get(1, 1), EPSILON); + assertEquals(0.0, similarityMatrix.get(1, 2), EPSILON); + + assertEquals(0.0, similarityMatrix.get(2, 0), EPSILON); + assertEquals(0.0, similarityMatrix.get(2, 1), EPSILON); + assertEquals(0.0, similarityMatrix.get(2, 2), EPSILON); + } + + @Test + public void testVectorDimensions() throws Exception { + + File inputFile = getTestTempFile("rows"); + + Configuration conf = getConfiguration(); + Path inputPath = new Path(inputFile.getAbsolutePath()); + FileSystem fs = FileSystem.get(inputPath.toUri(), conf); + + MathHelper.writeDistributedRowMatrix(new double[][] { + new double[] { 1, 0, 1, 1, 0, 1 }, + new double[] { 0, 1, 1, 1, 1, 1 }, + new double[] { 1, 1, 0, 1, 0, 0 } }, + fs, conf, inputPath); + + RowSimilarityJob rowSimilarityJob = new RowSimilarityJob(); + rowSimilarityJob.setConf(conf); + + int numberOfColumns = rowSimilarityJob.getDimensions(inputPath); + + assertEquals(6, numberOfColumns); + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/measures/VectorSimilarityMeasuresTest.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/measures/VectorSimilarityMeasuresTest.java b/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/measures/VectorSimilarityMeasuresTest.java new file mode 100644 index 0000000..c8a8c51 --- /dev/null +++ b/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/measures/VectorSimilarityMeasuresTest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.hadoop.similarity.cooccurrence.measures; + +import org.apache.mahout.common.ClassUtils; +import org.apache.mahout.common.MahoutTestCase; +import org.apache.mahout.math.DenseVector; +import org.apache.mahout.math.RandomAccessSparseVector; +import org.apache.mahout.math.SequentialAccessSparseVector; +import org.apache.mahout.math.Vector; +import org.junit.Test; + +public class VectorSimilarityMeasuresTest extends MahoutTestCase { + + static double distributedSimilarity(double[] one, + double[] two, + Class<? extends VectorSimilarityMeasure> similarityMeasureClass) { + double rand = computeSimilarity(one, two, similarityMeasureClass, new RandomAccessSparseVector(one.length)); + double seq = computeSimilarity(one, two, similarityMeasureClass, new SequentialAccessSparseVector(one.length)); + double dense = computeSimilarity(one, two, similarityMeasureClass, new DenseVector(one.length)); + assertEquals(seq, rand, 1.0e-10); + assertEquals(seq, dense, 1.0e-10); + assertEquals(dense, rand, 1.0e-10); + return seq; + } + + private static double computeSimilarity(double[] one, double[] two, + Class<? extends VectorSimilarityMeasure> similarityMeasureClass, + Vector like) { + VectorSimilarityMeasure similarityMeasure = ClassUtils.instantiateAs(similarityMeasureClass, + VectorSimilarityMeasure.class); + Vector oneNormalized = similarityMeasure.normalize(asVector(one, like)); + Vector twoNormalized = similarityMeasure.normalize(asVector(two, like)); + + double normOne = similarityMeasure.norm(oneNormalized); + double normTwo = similarityMeasure.norm(twoNormalized); + + double dot = 0; + for (int n = 0; n < one.length; n++) { + if (oneNormalized.get(n) != 0 && twoNormalized.get(n) != 0) { + dot += similarityMeasure.aggregate(oneNormalized.get(n), twoNormalized.get(n)); + } + } + + return similarityMeasure.similarity(dot, normOne, normTwo, one.length); + } + + static Vector asVector(double[] values, Vector like) { + Vector vector = like.like(); + for (int dim = 0; dim < values.length; dim++) { + vector.set(dim, values[dim]); + } + return vector; + } + + @Test + public void testCooccurrenceCountSimilarity() { + double similarity = distributedSimilarity( + new double[] { 0, 1, 0, 0, 1, 1, 0, 1, 0, 1, 1, 1, 0 }, + new double[] { 1, 0, 0, 0, 1, 0, 1, 1, 1, 1, 1, 1, 1 }, CooccurrenceCountSimilarity.class); + + assertEquals(5.0, similarity, 0); + } + + @Test + public void testTanimotoCoefficientSimilarity() { + double similarity = distributedSimilarity( + new double[] { 0, 1, 0, 0, 1, 1, 0, 1, 0, 1, 1, 1, 0 }, + new double[] { 1, 0, 0, 0, 1, 0, 1, 1, 1, 1, 1, 1, 1 }, TanimotoCoefficientSimilarity.class); + + assertEquals(0.454545455, similarity, EPSILON); + } + + @Test + public void testCityblockSimilarity() { + double similarity = distributedSimilarity( + new double[] { 0, 1, 0, 0, 1, 1, 0, 1, 0, 1, 1, 1, 0 }, + new double[] { 1, 0, 0, 0, 1, 0, 1, 1, 1, 1, 1, 1, 1 }, CityBlockSimilarity.class); + + assertEquals(0.142857143, similarity, EPSILON); + } + + @Test + public void testLoglikelihoodSimilarity() { + double similarity = distributedSimilarity( + new double[] { 0, 1, 0, 0, 1, 1, 0, 1, 0, 1, 1, 1, 0 }, + new double[] { 1, 0, 0, 0, 1, 0, 1, 1, 1, 1, 1, 1, 1 }, LoglikelihoodSimilarity.class); + + assertEquals(0.03320155369284261, similarity, EPSILON); + } + + @Test + public void testCosineSimilarity() { + double similarity = distributedSimilarity( + new double[] { 0, 2, 0, 0, 8, 3, 0, 6, 0, 1, 2, 2, 0 }, + new double[] { 3, 0, 0, 0, 7, 0, 2, 2, 1, 3, 2, 1, 1 }, CosineSimilarity.class); + + assertEquals(0.769846046, similarity, EPSILON); + } + + @Test + public void testPearsonCorrelationSimilarity() { + double similarity = distributedSimilarity( + new double[] { 0, 2, 0, 0, 8, 3, 0, 6, 0, 1, 1, 2, 1 }, + new double[] { 3, 0, 0, 0, 7, 0, 2, 2, 1, 3, 2, 4, 3 }, PearsonCorrelationSimilarity.class); + + assertEquals(0.5303300858899108, similarity, EPSILON); + } + + @Test + public void testEuclideanDistanceSimilarity() { + double similarity = distributedSimilarity( + new double[] { 0, 2, 0, 0, 8, 3, 0, 6, 0, 1, 1, 2, 1 }, + new double[] { 3, 0, 0, 0, 7, 0, 2, 2, 1, 3, 2, 4, 4 }, EuclideanDistanceSimilarity.class); + + assertEquals(0.11268865367232477, similarity, EPSILON); + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/solver/TestDistributedConjugateGradientSolver.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/solver/TestDistributedConjugateGradientSolver.java b/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/solver/TestDistributedConjugateGradientSolver.java new file mode 100644 index 0000000..e8487ad --- /dev/null +++ b/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/solver/TestDistributedConjugateGradientSolver.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.hadoop.solver; + +import java.io.File; +import java.util.Random; + +import org.apache.mahout.common.MahoutTestCase; +import org.apache.mahout.common.RandomUtils; +import org.apache.mahout.math.DenseVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.hadoop.DistributedRowMatrix; +import org.apache.mahout.math.hadoop.TestDistributedRowMatrix; +import org.junit.Test; + +public final class TestDistributedConjugateGradientSolver extends MahoutTestCase { + + private static Vector randomVector(int size, double entryMean) { + DenseVector v = new DenseVector(size); + Random r = RandomUtils.getRandom(); + + for (int i = 0; i < size; ++i) { + v.setQuick(i, r.nextGaussian() * entryMean); + } + + return v; + } + + @Test + public void testSolver() throws Exception { + File testData = getTestTempDir("testdata"); + DistributedRowMatrix matrix = new TestDistributedRowMatrix().randomDistributedMatrix( + 10, 10, 10, 10, 10.0, true, testData.getAbsolutePath()); + matrix.setConf(getConfiguration()); + Vector vector = randomVector(matrix.numCols(), 10.0); + + DistributedConjugateGradientSolver solver = new DistributedConjugateGradientSolver(); + Vector x = solver.solve(matrix, vector); + + Vector solvedVector = matrix.times(x); + double distance = Math.sqrt(vector.getDistanceSquared(solvedVector)); + assertEquals(0.0, distance, EPSILON); + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/solver/TestDistributedConjugateGradientSolverCLI.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/solver/TestDistributedConjugateGradientSolverCLI.java b/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/solver/TestDistributedConjugateGradientSolverCLI.java new file mode 100644 index 0000000..3ac9405 --- /dev/null +++ b/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/solver/TestDistributedConjugateGradientSolverCLI.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.hadoop.solver; + +import java.io.IOException; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.util.ToolRunner; +import org.apache.mahout.common.MahoutTestCase; +import org.apache.mahout.common.RandomUtils; +import org.apache.mahout.math.DenseVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; +import org.apache.mahout.math.hadoop.DistributedRowMatrix; +import org.apache.mahout.math.hadoop.TestDistributedRowMatrix; +import org.junit.Test; + +public final class TestDistributedConjugateGradientSolverCLI extends MahoutTestCase { + + private static Vector randomVector(int size, double entryMean) { + Vector v = new DenseVector(size); + Random r = RandomUtils.getRandom(); + for (int i = 0; i < size; ++i) { + v.setQuick(i, r.nextGaussian() * entryMean); + } + return v; + } + + private static Path saveVector(Configuration conf, Path path, Vector v) throws IOException { + FileSystem fs = path.getFileSystem(conf); + SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path, IntWritable.class, VectorWritable.class); + + try { + writer.append(new IntWritable(0), new VectorWritable(v)); + } finally { + writer.close(); + } + return path; + } + + private static Vector loadVector(Configuration conf, Path path) throws IOException { + FileSystem fs = path.getFileSystem(conf); + SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf); + Writable key = new IntWritable(); + VectorWritable value = new VectorWritable(); + + try { + if (!reader.next(key, value)) { + throw new IOException("Input vector file is empty."); + } + return value.get(); + } finally { + reader.close(); + } + } + + @Test + public void testSolver() throws Exception { + Configuration conf = getConfiguration(); + Path testData = getTestTempDirPath("testdata"); + DistributedRowMatrix matrix = new TestDistributedRowMatrix().randomDistributedMatrix( + 10, 10, 10, 10, 10.0, true, testData.toString()); + matrix.setConf(conf); + Path output = getTestTempFilePath("output"); + Path vectorPath = getTestTempFilePath("vector"); + Path tempPath = getTestTempDirPath("tmp"); + + Vector vector = randomVector(matrix.numCols(), 10.0); + saveVector(conf, vectorPath, vector); + + String[] args = { + "-i", matrix.getRowPath().toString(), + "-o", output.toString(), + "--tempDir", tempPath.toString(), + "--vector", vectorPath.toString(), + "--numRows", "10", + "--numCols", "10", + "--symmetric", "true" + }; + + DistributedConjugateGradientSolver solver = new DistributedConjugateGradientSolver(); + ToolRunner.run(getConfiguration(), solver.job(), args); + + Vector x = loadVector(conf, output); + + Vector solvedVector = matrix.times(x); + double distance = Math.sqrt(vector.getDistanceSquared(solvedVector)); + assertEquals(0.0, distance, EPSILON); + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/stats/BasicStatsTest.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/stats/BasicStatsTest.java b/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/stats/BasicStatsTest.java new file mode 100644 index 0000000..7e59eb4 --- /dev/null +++ b/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/stats/BasicStatsTest.java @@ -0,0 +1,121 @@ +package org.apache.mahout.math.hadoop.stats; +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.mahout.common.MahoutTestCase; +import org.apache.mahout.common.RandomUtils; +import org.apache.mahout.math.jet.random.Normal; +import org.junit.Before; +import org.junit.Test; + +import java.util.Random; + +public final class BasicStatsTest extends MahoutTestCase { + + private Configuration conf; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + conf = getConfiguration(); + } + + @Test + public void testVar() throws Exception { + Path input = getTestTempFilePath("stdDev/counts.file"); + Path output = getTestTempFilePath("stdDev/output.file"); + + produceTestData(input); + + double v = BasicStats.variance(input, output, conf); + assertEquals(2.44, v, 0.01); + } + + + @Test + public void testStdDev() throws Exception { + Path input = getTestTempFilePath("stdDev/counts.file"); + Path output = getTestTempFilePath("stdDev/output.file"); + + produceTestData(input); + + double v = BasicStats.stdDev(input, output, conf); + assertEquals(1.56, v, 0.01); //sample std dev is 1.563, std. dev from a discrete set is 1.48 + + } + + @Test + public void testStdDevForGivenMean() throws Exception { + Path input = getTestTempFilePath("stdDev/counts.file"); + Path output = getTestTempFilePath("stdDev/output.file"); + + produceTestData(input); + + double v = BasicStats.stdDevForGivenMean(input, output, 0.0D, conf); + assertEquals(10.65, v, 0.01); //sample std dev is 10.65 + + } + + private void produceTestData(Path input) throws Exception { + FileSystem fs = FileSystem.get(input.toUri(), conf); + SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, input, IntWritable.class, DoubleWritable.class); + //Random random = new MersenneTwisterRNG(); + /*Normal normal = new Normal(5, 3, random); + for (int i = 0; i < 10000; i++) { + writer.append(new IntWritable(i), new DoubleWritable((long)normal.nextDouble())); + }*/ + int i = 0; + writer.append(new IntWritable(i++), new DoubleWritable(7)); + writer.append(new IntWritable(i++), new DoubleWritable(9)); + writer.append(new IntWritable(i++), new DoubleWritable(9)); + writer.append(new IntWritable(i++), new DoubleWritable(10)); + writer.append(new IntWritable(i++), new DoubleWritable(10)); + writer.append(new IntWritable(i++), new DoubleWritable(10)); + writer.append(new IntWritable(i++), new DoubleWritable(10)); + writer.append(new IntWritable(i++), new DoubleWritable(11)); + writer.append(new IntWritable(i++), new DoubleWritable(11)); + writer.append(new IntWritable(i++), new DoubleWritable(13)); + writer.close(); + } + + //Not entirely sure on this test + @Test + public void testStdDev2() throws Exception { + Path input = getTestTempFilePath("stdDev/counts.file"); + Path output = getTestTempFilePath("stdDev/output.file"); + FileSystem fs = FileSystem.get(input.toUri(), conf); + SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, input, IntWritable.class, + DoubleWritable.class); + Random random = RandomUtils.getRandom(); + Normal normal = new Normal(5, 3, random); + for (int i = 0; i < 1000000; i++) { + writer.append(new IntWritable(i), new DoubleWritable((long) normal.nextInt())); + } + writer.close(); + double v = BasicStats.stdDev(input, output, conf); + assertEquals(3, v, 0.02); + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDPCASparseTest.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDPCASparseTest.java b/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDPCASparseTest.java new file mode 100644 index 0000000..6a194dd --- /dev/null +++ b/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDPCASparseTest.java @@ -0,0 +1,296 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.hadoop.stochasticsvd; + +import com.google.common.collect.Lists; +import com.google.common.io.Closeables; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.compress.DefaultCodec; +import org.apache.mahout.common.IOUtils; +import org.apache.mahout.common.MahoutTestCase; +import org.apache.mahout.common.Pair; +import org.apache.mahout.common.RandomUtils; +import org.apache.mahout.math.*; +import org.apache.mahout.math.function.DoubleFunction; +import org.apache.mahout.math.function.Functions; +import org.apache.mahout.math.function.VectorFunction; +import org.junit.Test; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.util.Deque; +import java.util.Iterator; +import java.util.Random; + +public class LocalSSVDPCASparseTest extends MahoutTestCase { + + private static final double s_epsilon = 1.0E-10d; + + @Test + public void testOmegaTRightMultiply() { + final Random rnd = RandomUtils.getRandom(); + final long seed = rnd.nextLong(); + final int n = 2000; + + final int kp = 100; + + final Omega omega = new Omega(seed, kp); + final Matrix materializedOmega = new DenseMatrix(n, kp); + for (int i = 0; i < n; i++) + for (int j = 0; j < kp; j++) + materializedOmega.setQuick(i, j, omega.getQuick(i, j)); + Vector xi = new DenseVector(n); + xi.assign(new DoubleFunction() { + @Override + public double apply(double x) { + return rnd.nextDouble() * 100; + } + }); + + Vector s_o = omega.mutlithreadedTRightMultiply(xi); + + Matrix xiVector = new DenseMatrix(n, 1); + xiVector.assignColumn(0, xi); + + Vector s_o_control = materializedOmega.transpose().times(xiVector).viewColumn(0); + + assertEquals(0, s_o.minus(s_o_control).aggregate(Functions.PLUS, Functions.ABS), 1e-10); + + System.out.printf("s_omega=\n%s\n", s_o); + System.out.printf("s_omega_control=\n%s\n", s_o_control); + } + + @Test + public void runPCATest1() throws IOException { + runSSVDSolver(1); + } + +// @Test + public void runPCATest0() throws IOException { + runSSVDSolver(0); + } + + + public void runSSVDSolver(int q) throws IOException { + + Configuration conf = new Configuration(); + conf.set("mapred.job.tracker", "local"); + conf.set("fs.default.name", "file:///"); + + // conf.set("mapred.job.tracker","localhost:11011"); + // conf.set("fs.default.name","hdfs://localhost:11010/"); + + Deque<Closeable> closeables = Lists.newLinkedList(); + try { + Random rnd = RandomUtils.getRandom(); + + File tmpDir = getTestTempDir("svdtmp"); + conf.set("hadoop.tmp.dir", tmpDir.getAbsolutePath()); + + Path aLocPath = new Path(getTestTempDirPath("svdtmp/A"), "A.seq"); + + // create distributed row matrix-like struct + SequenceFile.Writer w = + SequenceFile.createWriter(FileSystem.getLocal(conf), + conf, + aLocPath, + Text.class, + VectorWritable.class, + CompressionType.BLOCK, + new DefaultCodec()); + closeables.addFirst(w); + + int n = 100; + int m = 2000; + double percent = 5; + + VectorWritable vw = new VectorWritable(); + Text rkey = new Text(); + + Vector xi = new DenseVector(n); + + double muAmplitude = 50.0; + for (int i = 0; i < m; i++) { + Vector dv = new SequentialAccessSparseVector(n); + String rowname = "row-"+i; + NamedVector namedRow = new NamedVector(dv, rowname); + for (int j = 0; j < n * percent / 100; j++) { + dv.setQuick(rnd.nextInt(n), muAmplitude * (rnd.nextDouble() - 0.25)); + } + rkey.set("row-i"+i); + vw.set(namedRow); + w.append(rkey, vw); + xi.assign(dv, Functions.PLUS); + } + closeables.remove(w); + Closeables.close(w, false); + + xi.assign(Functions.mult(1.0 / m)); + + FileSystem fs = FileSystem.get(conf); + + Path tempDirPath = getTestTempDirPath("svd-proc"); + Path aPath = new Path(tempDirPath, "A/A.seq"); + fs.copyFromLocalFile(aLocPath, aPath); + Path xiPath = new Path(tempDirPath, "xi/xi.seq"); + SSVDHelper.saveVector(xi, xiPath, conf); + + Path svdOutPath = new Path(tempDirPath, "SSVD-out"); + + // make sure we wipe out previous test results, just a convenience + fs.delete(svdOutPath, true); + + // Solver starts here: + System.out.println("Input prepared, starting solver..."); + + int ablockRows = 867; + int p = 60; + int k = 40; + SSVDSolver ssvd = + new SSVDSolver(conf, + new Path[]{aPath}, + svdOutPath, + ablockRows, + k, + p, + 3); + ssvd.setOuterBlockHeight(500); + ssvd.setAbtBlockHeight(251); + ssvd.setPcaMeanPath(xiPath); + + /* + * Removing V,U jobs from this test to reduce running time. i will keep them + * put in the dense test though. + * + * For PCA test, we also want to request U*Sigma output and check it for named + * vector propagation. + */ + ssvd.setComputeU(false); + ssvd.setComputeV(false); + ssvd.setcUSigma(true); + + ssvd.setOverwrite(true); + ssvd.setQ(q); + ssvd.setBroadcast(true); + ssvd.run(); + + Vector stochasticSValues = ssvd.getSingularValues(); + + // try to run the same thing without stochastic algo + Matrix a = SSVDHelper.drmLoadAsDense(fs, aPath, conf); + + verifyInternals(svdOutPath, a, new Omega(ssvd.getOmegaSeed(), k + p), k + p, q); + + // subtract pseudo pca mean + for (int i = 0; i < m; i++) { + a.viewRow(i).assign(xi, Functions.MINUS); + } + + SingularValueDecomposition svd2 = + new SingularValueDecomposition(a); + + Vector svalues2 = new DenseVector(svd2.getSingularValues()); + + System.out.println("--SSVD solver singular values:"); + LocalSSVDSolverSparseSequentialTest.dumpSv(stochasticSValues); + System.out.println("--SVD solver singular values:"); + LocalSSVDSolverSparseSequentialTest.dumpSv(svalues2); + + for (int i = 0; i < k + p; i++) { + assertTrue(Math.abs(svalues2.getQuick(i) - stochasticSValues.getQuick(i)) <= s_epsilon); + } + + DenseMatrix mQ = + SSVDHelper.drmLoadAsDense(fs, new Path(svdOutPath, "Bt-job/" + + BtJob.OUTPUT_Q + "-*"), conf); + + SSVDCommonTest.assertOrthonormality(mQ, + false, + s_epsilon); + + // assert name propagation + for (Iterator<Pair<Writable, Vector>> iter = SSVDHelper.drmIterator(fs, + new Path(ssvd.getuSigmaPath()+"/*"), + conf, + closeables); iter.hasNext(); ) { + Pair<Writable, Vector> pair = iter.next(); + Writable key = pair.getFirst(); + Vector v = pair.getSecond(); + + assertTrue(v instanceof NamedVector); + assertTrue(key instanceof Text); + } + + } finally { + IOUtils.close(closeables); + } + } + + private void verifyInternals(Path tempDir, Matrix a, Omega omega, int kp, int q) { + int m = a.numRows(); + int n = a.numCols(); + + Vector xi = a.aggregateColumns(new VectorFunction() { + @Override + public double apply(Vector v) { + return v.zSum() / v.size(); + } + }); + + // materialize omega + Matrix momega = new DenseMatrix(n, kp); + for (int i = 0; i < n; i++) + for (int j = 0; j < kp; j++) + momega.setQuick(i, j, omega.getQuick(i, j)); + + Vector s_o = omega.mutlithreadedTRightMultiply(xi); + + System.out.printf("s_omega=\n%s\n", s_o); + + Matrix y = a.times(momega); + for (int i = 0; i < n; i++) y.viewRow(i).assign(s_o, Functions.MINUS); + + QRDecomposition qr = new QRDecomposition(y); + Matrix qm = qr.getQ(); + + Vector s_q = qm.aggregateColumns(new VectorFunction() { + @Override + public double apply(Vector v) { + return v.zSum(); + } + }); + + System.out.printf("s_q=\n%s\n", s_q); + + Matrix b = qm.transpose().times(a); + + Vector s_b = b.times(xi); + + System.out.printf("s_b=\n%s\n", s_b); + + + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDSolverDenseTest.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDSolverDenseTest.java b/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDSolverDenseTest.java new file mode 100644 index 0000000..784c7a5 --- /dev/null +++ b/community/mahout-mr/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDSolverDenseTest.java @@ -0,0 +1,206 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.hadoop.stochasticsvd; + +import java.io.File; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.mahout.common.MahoutTestCase; +import org.apache.mahout.math.DenseMatrix; +import org.apache.mahout.math.DenseVector; +import org.apache.mahout.math.Vector; +import org.junit.Test; + +/** + * + * Tests SSVD solver with a made-up data running hadoop solver in a local mode. + * It requests full-rank SSVD and then compares singular values to that of + * Colt's SVD asserting epsilon(precision) 1e-10 or whatever most recent value + * configured. + * + */ +public class LocalSSVDSolverDenseTest extends MahoutTestCase { + + private static final double s_epsilon = 1.0E-10d; + + /* + * I actually never saw errors more than 3% worst case for this particular + * test, but since it's non-deterministic test, it still may occasionally + * produce bad results with a non-zero probability, so i put this pct% for + * error margin high enough so it (almost) never fails. + */ + private static final double s_precisionPct = 10; + + @Test + public void testSSVDSolverDense() throws IOException { + runSSVDSolver(0); + } + + @Test + public void testSSVDSolverPowerIterations1() throws IOException { + runSSVDSolver(1); + } + + // remove from active tests to save time. + /* + @Test + public void testSSVDSolverPowerIterations2() throws IOException { + runSSVDSolver(2); + } + */ + + public void runSSVDSolver(int q) throws IOException { + + Configuration conf = getConfiguration(); + conf.set("mapred.job.tracker", "local"); + conf.set("fs.default.name", "file:///"); + + // conf.set("mapred.job.tracker","localhost:11011"); + // conf.set("fs.default.name","hdfs://localhost:11010/"); + + File tmpDir = getTestTempDir("svdtmp"); + conf.set("hadoop.tmp.dir", tmpDir.getAbsolutePath()); + + Path aLocPath = new Path(getTestTempDirPath("svdtmp/A"), "A.seq"); + + // create distributed row matrix-like struct + // SequenceFile.Writer w = SequenceFile.createWriter( + // FileSystem.getLocal(conf), conf, aLocPath, IntWritable.class, + // VectorWritable.class, CompressionType.NONE, new DefaultCodec()); + // closeables.addFirst(w); + + // make input equivalent to 2 mln non-zero elements. + // With 100mln the precision turns out to be only better (LLN law i guess) + // With oversampling of 100, i don't get any error at all. + int n = 100; + int m = 2000; + Vector singularValues = + new DenseVector(new double[] { 10, 4, 1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, + 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, + 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, + 0.1, 0.1, 0.1, 0.1, 0.1, 0.1 }); + + SSVDTestsHelper.generateDenseInput(aLocPath, + FileSystem.getLocal(conf), + singularValues, + m, + n); + + FileSystem fs = FileSystem.get(aLocPath.toUri(), conf); + + Path tempDirPath = getTestTempDirPath("svd-proc"); + Path aPath = new Path(tempDirPath, "A/A.seq"); + fs.copyFromLocalFile(aLocPath, aPath); + + Path svdOutPath = new Path(tempDirPath, "SSVD-out"); + + // Solver starts here: + System.out.println("Input prepared, starting solver..."); + + int ablockRows = 867; + int p = 10; + int k = 3; + SSVDSolver ssvd = + new SSVDSolver(conf, + new Path[] { aPath }, + svdOutPath, + ablockRows, + k, + p, + 3); + /* + * these are only tiny-test values to simulate high load cases, in reality + * one needs much bigger + */ + ssvd.setOuterBlockHeight(500); + ssvd.setAbtBlockHeight(400); + ssvd.setOverwrite(true); + ssvd.setQ(q); + ssvd.setBroadcast(false); + ssvd.run(); + + Vector stochasticSValues = ssvd.getSingularValues(); + System.out.println("--SSVD solver singular values:"); + dumpSv(stochasticSValues); + + // the full-rank svd for this test size takes too long to run, + // so i comment it out, instead, i will be comparing + // result singular values to the original values used + // to generate input (which are guaranteed to be right). + + /* + * System.out.println("--Colt SVD solver singular values:"); // try to run + * + * the same thing without stochastic algo double[][] a = + * SSVDSolver.drmLoadAsDense(fs, aPath, conf); + * + * + * + * SingularValueDecomposition svd2 = new SingularValueDecomposition(new + * DenseMatrix(a)); + * + * a = null; + * + * double[] svalues2 = svd2.getSingularValues(); dumpSv(svalues2); + * + * for (int i = 0; i < k ; i++) { Assert .assertTrue(1-Math.abs((svalues2[i] + * - stochasticSValues[i])/svalues2[i]) <= s_precisionPct/100); } + */ + + // assert first k against those + // used to generate surrogate input + + for (int i = 0; i < k; i++) { + assertTrue(Math.abs((singularValues.getQuick(i) - stochasticSValues.getQuick(i)) + / singularValues.getQuick(i)) <= s_precisionPct / 100); + } + + DenseMatrix mQ = + SSVDHelper.drmLoadAsDense(fs, new Path(svdOutPath, "Bt-job/" + + BtJob.OUTPUT_Q + "-*"), conf); + + SSVDCommonTest.assertOrthonormality(mQ, + false, + s_epsilon); + + DenseMatrix u = + SSVDHelper.drmLoadAsDense(fs, + new Path(svdOutPath, "U/*"), + conf); + SSVDCommonTest.assertOrthonormality(u, false, s_epsilon); + + DenseMatrix v = + SSVDHelper.drmLoadAsDense(fs, + new Path(svdOutPath, "V/*"), + conf); + SSVDCommonTest.assertOrthonormality(v, false, s_epsilon); + } + + static void dumpSv(Vector s) { + System.out.printf("svs: "); + for (Vector.Element el : s.all()) { + System.out.printf("%f ", el.get()); + } + System.out.println(); + + } + +}
