Added: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDSolver.java URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDSolver.java?rev=1078694&view=auto ============================================================================== --- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDSolver.java (added) +++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDSolver.java Mon Mar 7 06:34:12 2011 @@ -0,0 +1,489 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.hadoop.stochasticsvd; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.LinkedList; +import java.util.List; +import java.util.Random; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.io.Writable; +import org.apache.mahout.common.IOUtils; +import org.apache.mahout.math.DenseVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; +import org.apache.mahout.math.ssvd.EigenSolverWrapper; + +/** + * Stochastic SVD solver (API class). + * <P> + * + * Implementation details are in my working notes in MAHOUT-376 + * (https://issues.apache.org/jira/browse/MAHOUT-376). + * <P> + * + * As of the time of this writing, I don't have benchmarks for this method + * in comparison to other methods. However, non-hadoop differentiating + * characteristics of this method are thought to be : + * <LI> "faster" and precision is traded off in favor of speed. However, + * there's lever in terms of "oversampling parameter" p. Higher values of p + * produce better precision but are trading off speed (and minimum RAM requirement). + * This also means that this method is almost guaranteed to be less precise + * than Lanczos unless full rank SVD decomposition is sought. + * <LI> "more scale" -- can presumably take on larger problems than Lanczos one + * (not confirmed by benchmark at this time) + * <P><P> + * + * Specifically in regards to this implementation, <i>I think</i> + * couple of other differentiating points are: + * <LI> no need to specify input matrix height or width in command line, it is what it + * gets to be. + * <LI> supports any Writable as DRM row keys and copies them to correspondent rows + * of U matrix; + * <LI> can request U or V or U<sub>σ</sub>=U* Σ<sup>0.5</sup> or + * V<sub>σ</sub>=V* Σ<sup>0.5</sup> none of which + * would require pass over input A and these jobs are parallel map-only jobs. + * <P><P> + * + * This class is central public API for SSVD solver. The use pattern is as + * follows: + * + * <UL> + * <LI>create the solver using constructor and supplying computation parameters. + * <LI>set optional parameters thru setter methods. + * <LI>call {@link #run()}. + * <LI> {@link #getUPath()} (if computed) returns the path to the directory + * containing m x k U matrix file(s). + * <LI> {@link #getVPath()} (if computed) returns the path to the directory + * containing n x k V matrix file(s). + * + * </UL> + * + * + * + */ +public class SSVDSolver { + + private double[] svalues; + private boolean computeU = true; + private boolean computeV = true; + private String uPath; + private String vPath; + + // configured stuff + private Configuration conf; + private Path[] inputPath; + private Path outputPath; + private int ablockRows; + private int k; + private int p; + private int reduceTasks; + private int minSplitSize = -1; + private boolean cUHalfSigma; + private boolean cVHalfSigma; + + /** + * create new SSVD solver. Required parameters are passed to constructor to + * ensure they are set. Optional parameters can be set using setters . + * <P> + * + * @param conf + * hadoop configuration + * @param inputPath + * Input path (should be compatible with DistributedRowMatrix as of + * the time of this writing). + * @param outputPath + * Output path containing U, V and singular values vector files. + * @param ablockRows + * The vertical hight of a q-block (bigger value require more memory + * in mappers+ perhaps larger <code>minSplitSize</code> values + * @param k + * desired rank + * @param p + * SSVD oversampling parameter + * @param reduceTasks + * Number of reduce tasks (where applicable) + * @throws IOException + * when IO condition occurs. + */ + + public SSVDSolver(Configuration conf, Path[] inputPath, Path outputPath, + int ablockRows, int k, int p, int reduceTasks) throws IOException { + this.conf = conf; + this.inputPath = inputPath; + this.outputPath = outputPath; + this.ablockRows = ablockRows; + this.k = k; + this.p = p; + this.reduceTasks = reduceTasks; + } + + public void setcUHalfSigma(boolean cUHat) { + this.cUHalfSigma = cUHat; + } + + public void setcVHalfSigma(boolean cVHat) { + this.cVHalfSigma = cVHat; + } + + /** + * The setting controlling whether to compute U matrix of low rank SSVD. + * + */ + public void setComputeU(boolean val) { + computeU = val; + } + + /** + * Setting controlling whether to compute V matrix of low-rank SSVD. + * + * @param val + * true if we want to output V matrix. Default is true. + */ + public void setComputeV(boolean val) { + computeV = val; + } + + /** + * Sometimes, if requested A blocks become larger than a split, we may need to + * use that to ensure at least k+p rows of A get into a split. This is + * requirement necessary to obtain orthonormalized Q blocks of SSVD. + * + * @param size + * the minimum split size to use + */ + public void setMinSplitSize(int size) { + minSplitSize = size; + } + + /** + * This contains k+p singular values resulted from the solver run. + * + * @return singlular values (largest to smallest) + */ + public double[] getSingularValues() { + return svalues; + } + + /** + * returns U path (if computation were requested and successful). + * + * @return U output hdfs path, or null if computation was not completed for + * whatever reason. + */ + public String getUPath() { + return uPath; + } + + /** + * return V path ( if computation was requested and successful ) . + * + * @return V output hdfs path, or null if computation was not completed for + * whatever reason. + */ + public String getVPath() { + return vPath; + } + + /** + * run all SSVD jobs. + * + * @throws IOException + * if I/O condition occurs. + */ + public void run() throws IOException { + + LinkedList<Closeable> closeables = new LinkedList<Closeable>(); + try { + Class<? extends Writable> labelType = sniffInputLabelType(inputPath, + conf, closeables); + FileSystem fs = FileSystem.get(conf); + + Path qPath = new Path(outputPath, "Q-job"); + Path btPath = new Path(outputPath, "Bt-job"); + Path bbtPath = new Path(outputPath, "BBt-job"); + Path uHatPath = new Path(outputPath, "UHat"); + Path svPath = new Path(outputPath, "Sigma"); + Path uPath = new Path(outputPath, "U"); + Path vPath = new Path(outputPath, "V"); + + fs.delete(qPath, true); // or we can't re-run it repeatedly, just in case. + fs.delete(btPath, true); + fs.delete(bbtPath, true); + fs.delete(uHatPath, true); + fs.delete(svPath, true); + fs.delete(uPath, true); + fs.delete(vPath, true); + + Random rnd = new Random(); + long seed = rnd.nextLong(); + + QJob.run(conf, inputPath, qPath, ablockRows, minSplitSize, k, p, seed, + reduceTasks); + + BtJob.run(conf, inputPath, qPath, btPath, minSplitSize, k, p, + reduceTasks, labelType); + + BBtJob.run(conf, new Path(btPath, BtJob.OUTPUT_BT + "-*"), bbtPath, 1); + + UpperTriangular bbt = loadUpperTriangularMatrix(fs, new Path(bbtPath, + BBtJob.OUTPUT_BBT + "-*"), conf); + + // convert bbt to something our eigensolver could understand + assert bbt.columnSize() == k + p; + + double[][] bbtSquare = new double[k + p][]; + for (int i = 0; i < k + p; i++) + bbtSquare[i] = new double[k + p]; + + for (int i = 0; i < k + p; i++) + for (int j = i; j < k + p; j++) + bbtSquare[i][j] = bbtSquare[j][i] = bbt.getQuick(i, j); + + svalues = new double[k + p]; + + // try something else. + EigenSolverWrapper eigenWrapper = new EigenSolverWrapper(bbtSquare); + + double[] eigenva2 = eigenWrapper.getEigenValues(); + for (int i = 0; i < k + p; i++) + svalues[i] = Math.sqrt(eigenva2[i]); // sqrt? + + // save/redistribute UHat + // + double[][] uHat = eigenWrapper.getUHat(); + + fs.mkdirs(uHatPath); + SequenceFile.Writer uHatWriter = SequenceFile.createWriter(fs, conf, + uHatPath = new Path(uHatPath, "uhat.seq"), IntWritable.class, + VectorWritable.class, CompressionType.BLOCK); + closeables.addFirst(uHatWriter); + + int m = uHat.length; + IntWritable iw = new IntWritable(); + VectorWritable vw = new VectorWritable(); + for (int i = 0; i < m; i++) { + vw.set(new DenseVector(uHat[i], true)); + iw.set(i); + uHatWriter.append(iw, vw); + } + + closeables.remove(uHatWriter); + uHatWriter.close(); + + SequenceFile.Writer svWriter = SequenceFile.createWriter(fs, conf, + svPath = new Path(svPath, "svalues.seq"), IntWritable.class, + VectorWritable.class, CompressionType.BLOCK); + + closeables.addFirst(svWriter); + + vw.set(new DenseVector(svalues, true)); + svWriter.append(iw, vw); + + closeables.remove(svWriter); + svWriter.close(); + + UJob ujob = null; + VJob vjob = null; + if (computeU) + (ujob = new UJob()).start(conf, + new Path(btPath, BtJob.OUTPUT_Q + "-*"), uHatPath, svPath, uPath, + k, reduceTasks, labelType, cUHalfSigma); // actually this is + // map-only job anyway + + if (computeV) + (vjob = new VJob()).start(conf, + new Path(btPath, BtJob.OUTPUT_BT + "-*"), uHatPath, svPath, vPath, + k, reduceTasks, cVHalfSigma); + + if (ujob != null) { + ujob.waitForCompletion(); + this.uPath = uPath.toString(); + } + if (vjob != null) { + vjob.waitForCompletion(); + this.vPath = vPath.toString(); + } + + } catch (InterruptedException exc) { + throw new IOException("Interrupted", exc); + } catch (ClassNotFoundException exc) { + throw new IOException(exc); + + } finally { + IOUtils.close(closeables); + } + + } + + private static Class<? extends Writable> sniffInputLabelType( + Path[] inputPath, Configuration conf, LinkedList<Closeable> closeables) + throws IOException { + FileSystem fs = FileSystem.get(conf); + for (Path p : inputPath) { + FileStatus[] fstats = fs.globStatus(p); + if (fstats == null || fstats.length == 0) + continue; + SequenceFile.Reader r = new SequenceFile.Reader(fs, fstats[0].getPath(), + conf); + closeables.addFirst(r); + + try { + return r.getKeyClass().asSubclass(Writable.class); + } finally { + closeables.remove(r); + r.close(); + } + } + + throw new IOException( + "Unable to open input files to determine input label type."); + } + + private static final Pattern OUTPUT_FILE_PATTERN = Pattern + .compile("(\\w+)-(m|r)-(\\d+)(\\.\\w+)?"); + + static Comparator<FileStatus> partitionComparator = new Comparator<FileStatus>() { + private Matcher matcher = OUTPUT_FILE_PATTERN.matcher(""); + + @Override + public int compare(FileStatus o1, FileStatus o2) { + matcher.reset(o1.getPath().getName()); + if (!matcher.matches()) + throw new RuntimeException(String.format( + "Unexpected file name, unable to deduce partition #:%s", o1 + .getPath().toString())); + int p1 = Integer.parseInt(matcher.group(3)); + matcher.reset(o2.getPath().getName()); + if (!matcher.matches()) + throw new RuntimeException(String.format( + "Unexpected file name, unable to deduce partition #:%s", o2 + .getPath().toString())); + + int p2 = Integer.parseInt(matcher.group(3)); + return p1 - p2; + } + + }; + + /** + * helper capabiltiy to load distributed row matrices into dense matrix (to + * support tests mainly). + * + * @param fs + * filesystem + * @param glob + * FS glob + * @param conf + * configuration + * @return Dense matrix array + * @throws IOException + * when I/O occurs. + */ + public static double[][] loadDistributedRowMatrix(FileSystem fs, Path glob, + Configuration conf) throws IOException { + + FileStatus[] files = fs.globStatus(glob); + if (files == null) + return null; + + List<double[]> denseData = new ArrayList<double[]>(); + IntWritable iw = new IntWritable(); + VectorWritable vw = new VectorWritable(); + + // int m=0; + + // assume it is partitioned output, so we need to read them up + // in order of partitions. + Arrays.sort(files, partitionComparator); + + for (FileStatus fstat : files) { + Path file = fstat.getPath(); + SequenceFile.Reader reader = new SequenceFile.Reader(fs, file, conf); + try { + while (reader.next(iw, vw)) { + Vector v = vw.get(); + int size; + double[] row = new double[size = v.size()]; + for (int i = 0; i < size; i++) + row[i] = v.get(i); + // ignore row label. + // int rowIndex=iw.get(); + denseData.add(row); + + } + } finally { + reader.close(); + } + } + + return denseData.toArray(new double[denseData.size()][]); + } + + public static UpperTriangular loadUpperTriangularMatrix(FileSystem fs, + Path glob, Configuration conf) throws IOException { + + FileStatus[] files = fs.globStatus(glob); + if (files == null) + return null; + + IntWritable iw = new IntWritable(); + VectorWritable vw = new VectorWritable(); + UpperTriangular result = null; + + // assume it is partitioned output, so we need to read them up + // in order of partitions. + Arrays.sort(files, partitionComparator); + + for (FileStatus fstat : files) { + Path file = fstat.getPath(); + SequenceFile.Reader reader = new SequenceFile.Reader(fs, file, conf); + try { + while (reader.next(iw, vw)) { + Vector v = vw.get(); + if (result == null) + result = new UpperTriangular(v); + else + throw new IOException( + "Unexpected overrun in upper triangular matrix files"); + } + } finally { + reader.close(); + } + } + + if (result == null) + throw new IOException( + "Unexpected underrun in upper triangular matrix files"); + return result; + } + +} \ No newline at end of file
Added: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/UJob.java URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/UJob.java?rev=1078694&view=auto ============================================================================== --- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/UJob.java (added) +++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/UJob.java Mon Mar 7 06:34:12 2011 @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.hadoop.stochasticsvd; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.compress.DefaultCodec; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.mahout.math.DenseMatrix; +import org.apache.mahout.math.DenseVector; +import org.apache.mahout.math.Matrix; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; + +/** + * Computes U=Q*Uhat of SSVD (optionally adding x pow(Sigma, 0.5) ) + * + * @author Dmitriy + * + */ +public class UJob { + private static final String OUTPUT_U = "u"; + private static final String PROP_UHAT_PATH = "ssvd.uhat.path"; + private static final String PROP_SIGMA_PATH = "ssvd.sigma.path"; + private static final String PROP_U_HALFSIGMA = "ssvd.u.halfsigma"; + private static final String PROP_K = "ssvd.k"; + + private Job job; + + public void start(Configuration conf, Path inputPathQ, Path inputUHatPath, + Path sigmaPath, Path outputPath, int k, int numReduceTasks, + Class<? extends Writable> labelClass, boolean uHalfSigma) + throws ClassNotFoundException, InterruptedException, IOException { + + job = new Job(conf); + job.setJobName("U-job"); + job.setJarByClass(UJob.class); + + job.setInputFormatClass(SequenceFileInputFormat.class); + job.setOutputFormatClass(SequenceFileOutputFormat.class); + FileInputFormat.setInputPaths(job, inputPathQ); + FileOutputFormat.setOutputPath(job, outputPath); + + // Warn: tight hadoop integration here: + job.getConfiguration().set("mapreduce.output.basename", OUTPUT_U); + SequenceFileOutputFormat.setCompressOutput(job, true); + SequenceFileOutputFormat + .setOutputCompressorClass(job, DefaultCodec.class); + SequenceFileOutputFormat.setOutputCompressionType(job, + CompressionType.BLOCK); + + job.setMapperClass(UMapper.class); + job.setMapOutputKeyClass(IntWritable.class); + job.setMapOutputValueClass(VectorWritable.class); + + job.setOutputKeyClass(labelClass); + job.setOutputValueClass(VectorWritable.class); + + job.getConfiguration().set(PROP_UHAT_PATH, inputUHatPath.toString()); + job.getConfiguration().set(PROP_SIGMA_PATH, sigmaPath.toString()); + if (uHalfSigma) + job.getConfiguration().set(PROP_U_HALFSIGMA, "y"); + job.getConfiguration().setInt(PROP_K, k); + job.setNumReduceTasks(0); + job.submit(); + + } + + public void waitForCompletion() throws IOException, ClassNotFoundException, + InterruptedException { + job.waitForCompletion(false); + + if (!job.isSuccessful()) + throw new IOException("U job unsuccessful."); + + } + + public static final class UMapper extends + Mapper<Writable, VectorWritable, Writable, VectorWritable> { + + private Matrix uHat; + private DenseVector uRow; + private VectorWritable uRowWritable; + private int kp; + private int k; + private Vector sValues; + + @Override + protected void map(Writable key, VectorWritable value, Context context) + throws IOException, InterruptedException { + Vector qRow = value.get(); + if (sValues != null) + for (int i = 0; i < k; i++) + uRow.setQuick(i, + qRow.dot(uHat.getColumn(i)) * sValues.getQuick(i)); + else + for (int i = 0; i < k; i++) + uRow.setQuick(i, qRow.dot(uHat.getColumn(i))); + + context.write(key, uRowWritable); // U inherits original A row labels. + } + + @Override + protected void setup(Context context) throws IOException, + InterruptedException { + super.setup(context); + FileSystem fs = FileSystem.get(context.getConfiguration()); + Path uHatPath = new Path(context.getConfiguration().get(PROP_UHAT_PATH)); + Path sigmaPath = new Path(context.getConfiguration().get(PROP_SIGMA_PATH)); + + uHat = new DenseMatrix(SSVDSolver.loadDistributedRowMatrix(fs, + uHatPath, context.getConfiguration())); + // since uHat is (k+p) x (k+p) + kp = uHat.columnSize(); + k = context.getConfiguration().getInt(PROP_K, kp); + uRow = new DenseVector(k); + uRowWritable = new VectorWritable(uRow); + + if (context.getConfiguration().get(PROP_U_HALFSIGMA) != null) { + sValues = new DenseVector(SSVDSolver.loadDistributedRowMatrix(fs, + sigmaPath, context.getConfiguration())[0], true); + for (int i = 0; i < k; i++) + sValues.setQuick(i, Math.sqrt(sValues.getQuick(i))); + } + + } + + } + +} Added: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/UpperTriangular.java URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/UpperTriangular.java?rev=1078694&view=auto ============================================================================== --- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/UpperTriangular.java (added) +++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/UpperTriangular.java Mon Mar 7 06:34:12 2011 @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.hadoop.stochasticsvd; + +import org.apache.mahout.math.AbstractMatrix; +import org.apache.mahout.math.Matrix; +import org.apache.mahout.math.Vector; + +/** + * + * Quick and dirty implementation of some {@link Matrix} methods + * over packed upper triangular matrix. + * + */ +public class UpperTriangular extends AbstractMatrix { + + private static final double EPSILON = 1e-12; // assume anything less than + // that to be 0 during + // non-upper assignments + + private double[] values; + private int n; + + /** + * represents n x n upper triangular matrix + * + * @param n + */ + + public UpperTriangular(int n) { + super(); + + values = new double[n * (n + 1) / 2]; + this.n = n; + cardinality[0] = cardinality[1] = n; + } + + public UpperTriangular(Vector data) { + n = (int) Math.round((-1 + Math.sqrt(1 + 8 * data.size())) / 2); + cardinality[0] = cardinality[1] = n; + values = new double[n * (n + 1) / 2]; + int n = data.size(); + // if ( data instanceof DenseVector ) + // ((DenseVector)data). + // system.arraycopy would've been much faster, but this way it's a drag + // on B-t job. + for (int i = 0; i < n; i++) + values[i] = data.getQuick(i); + } + + public UpperTriangular(double[] data, boolean shallow) { + super(); + if (data == null) + throw new IllegalArgumentException("data"); + values = shallow ? data : data.clone(); + n = (int) Math.round((-1 + Math.sqrt(1 + 8 * data.length)) / 2); + cardinality[0] = cardinality[1] = n; + } + + // copy-constructor + public UpperTriangular(UpperTriangular mx) { + this(mx.values, false); + } + + @Override + public Matrix assignColumn(int column, Vector other) { + + throw new UnsupportedOperationException(); + } + + @Override + public Matrix assignRow(int row, Vector other) { + for (int i = 0; i < row; i++) + if (other.getQuick(i) > EPSILON) + throw new RuntimeException("non-triangular source"); + for (int i = row; i < n; i++) + setQuick(row, i, other.get(i)); + return this; + } + + public Matrix assignRow(int row, double[] other) { + System.arraycopy(other, row, values, getL(row, row), n - row); + return this; + } + + @Override + public Vector getColumn(int column) { + throw new UnsupportedOperationException(); + } + + @Override + public Vector getRow(int row) { + throw new UnsupportedOperationException(); + } + + @Override + public double getQuick(int row, int column) { + if (row > column) + return 0; + return values[getL(row, column)]; + } + + private int getL(int row, int col) { + return (((n << 1) - row + 1) * row >> 1) + col - row; + } + + @Override + public Matrix like() { + throw new UnsupportedOperationException(); + } + + @Override + public Matrix like(int rows, int columns) { + throw new UnsupportedOperationException(); + } + + @Override + public void setQuick(int row, int column, double value) { + values[getL(row, column)] = value; + } + + @Override + public int[] getNumNondefaultElements() { + throw new UnsupportedOperationException(); + } + + @Override + public Matrix viewPart(int[] offset, int[] size) { + throw new UnsupportedOperationException(); + } + + double[] getData() { + return values; + } + +} Added: mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/VJob.java URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/VJob.java?rev=1078694&view=auto ============================================================================== --- mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/VJob.java (added) +++ mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/VJob.java Mon Mar 7 06:34:12 2011 @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.hadoop.stochasticsvd; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.io.compress.DefaultCodec; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.mahout.math.DenseMatrix; +import org.apache.mahout.math.DenseVector; +import org.apache.mahout.math.Matrix; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; + +/** + * Computes U=Q*Uhat of SSVD + * + * + */ +public class VJob { + private static final String OUTPUT_V = "v"; + private static final String PROP_UHAT_PATH = "ssvd.uhat.path"; + private static final String PROP_SIGMA_PATH = "ssvd.sigma.path"; + private static final String PROP_V_HALFSIGMA = "ssvd.v.halfsigma"; + private static final String PROP_K = "ssvd.k"; + + private Job job; + + public void start(Configuration conf, Path inputPathBt, Path inputUHatPath, + Path inputSigmaPath, Path outputPath, int k, int numReduceTasks, + boolean vHalfSigma) throws ClassNotFoundException, InterruptedException, + IOException { + + job = new Job(conf); + job.setJobName("V-job"); + job.setJarByClass(VJob.class); + + job.setInputFormatClass(SequenceFileInputFormat.class); + job.setOutputFormatClass(SequenceFileOutputFormat.class); + FileInputFormat.setInputPaths(job, inputPathBt); + FileOutputFormat.setOutputPath(job, outputPath); + + // Warn: tight hadoop integration here: + job.getConfiguration().set("mapreduce.output.basename", OUTPUT_V); + SequenceFileOutputFormat.setCompressOutput(job, true); + SequenceFileOutputFormat + .setOutputCompressorClass(job, DefaultCodec.class); + SequenceFileOutputFormat.setOutputCompressionType(job, + CompressionType.BLOCK); + + job.setMapOutputKeyClass(IntWritable.class); + job.setMapOutputValueClass(VectorWritable.class); + + job.setOutputKeyClass(IntWritable.class); + job.setOutputValueClass(VectorWritable.class); + + job.setMapperClass(VMapper.class); + + job.getConfiguration().set(PROP_UHAT_PATH, inputUHatPath.toString()); + job.getConfiguration().set(PROP_SIGMA_PATH, inputSigmaPath.toString()); + if (vHalfSigma) + job.getConfiguration().set(PROP_V_HALFSIGMA, "y"); + job.getConfiguration().setInt(PROP_K, k); + job.setNumReduceTasks(0); + job.submit(); + + } + + public void waitForCompletion() throws IOException, ClassNotFoundException, + InterruptedException { + job.waitForCompletion(false); + + if (!job.isSuccessful()) + throw new IOException("V job unsuccessful."); + + } + + public static final class VMapper extends + Mapper<IntWritable, VectorWritable, IntWritable, VectorWritable> { + + private Matrix uHat; + private DenseVector vRow; + private DenseVector sValues; + private VectorWritable vRowWritable; + private int kp; + private int k; + + @Override + protected void map(IntWritable key, VectorWritable value, Context context) + throws IOException, InterruptedException { + Vector qRow = value.get(); + for (int i = 0; i < k; i++) + vRow.setQuick(i, + qRow.dot(uHat.getColumn(i)) / sValues.getQuick(i)); + context.write(key, vRowWritable); // U inherits original A row labels. + } + + @Override + protected void setup(Context context) throws IOException, + InterruptedException { + super.setup(context); + FileSystem fs = FileSystem.get(context.getConfiguration()); + Path uHatPath = new Path(context.getConfiguration().get(PROP_UHAT_PATH)); + + Path sigmaPath = new Path(context.getConfiguration().get(PROP_SIGMA_PATH)); + + uHat = new DenseMatrix(SSVDSolver.loadDistributedRowMatrix(fs, + uHatPath, context.getConfiguration())); + // since uHat is (k+p) x (k+p) + kp = uHat.columnSize(); + k = context.getConfiguration().getInt(PROP_K, kp); + vRow = new DenseVector(k); + vRowWritable = new VectorWritable(vRow); + + sValues = new DenseVector(SSVDSolver.loadDistributedRowMatrix(fs, + sigmaPath, context.getConfiguration())[0], true); + if (context.getConfiguration().get(PROP_V_HALFSIGMA) != null) + for (int i = 0; i < k; i++) + sValues.setQuick(i, Math.sqrt(sValues.getQuick(i))); + + } + + } + +} Added: mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDSolverTest.java URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDSolverTest.java?rev=1078694&view=auto ============================================================================== --- mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDSolverTest.java (added) +++ mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDSolverTest.java Mon Mar 7 06:34:12 2011 @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.hadoop.stochasticsvd; + +import java.io.Closeable; +import java.io.File; +import java.util.LinkedList; +import java.util.Random; + +import junit.framework.Assert; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.io.compress.DefaultCodec; +import org.apache.mahout.common.MahoutTestCase; +import org.apache.mahout.math.DenseMatrix; +import org.apache.mahout.math.DenseVector; +import org.apache.mahout.math.SingularValueDecomposition; +import org.apache.mahout.math.VectorWritable; +import org.junit.Test; + +/** + * + * Tests SSVD solver with a made-up data running hadoop + * solver in a local mode. It requests full-rank SSVD and + * then compares singular values to that of Colt's SVD + * asserting epsilon(precision) 1e-10 or whatever most recent + * value configured. + * + */ +public class LocalSSVDSolverTest extends MahoutTestCase { + + private static final double s_epsilon = 1.0E-10d; + + @Test + public void testSSVDSolver() throws Exception { + + Configuration conf = new Configuration(); + conf.set("mapred.job.tracker", "local"); + conf.set("fs.default.name", "file:///"); + + // conf.set("mapred.job.tracker","localhost:11011"); + // conf.set("fs.default.name","hdfs://localhost:11010/"); + + LinkedList<Closeable> closeables = new LinkedList<Closeable>(); + Random rnd = new Random(); + int m = 1000, n = 100, k = 40, p = 60, ablockRows = 251; + + double muAmplitude = 5e+1; + + File tmpDir = new File("svdtmp"); + tmpDir.mkdir(); + conf.set("hadoop.tmp.dir", tmpDir.getAbsolutePath()); + + File aDir = new File(tmpDir, "A"); + aDir.mkdir(); + + Path aLocPath = new Path(new Path(aDir.getAbsolutePath()), "A.seq"); + + // create distributed row matrix-like struct + SequenceFile.Writer w = SequenceFile.createWriter( + FileSystem.getLocal(conf), conf, aLocPath, IntWritable.class, + VectorWritable.class, CompressionType.BLOCK, new DefaultCodec()); + closeables.addFirst(w); + + double[] row = new double[n]; + DenseVector dv = new DenseVector(row, true); + VectorWritable vw = new VectorWritable(dv); + IntWritable roww = new IntWritable(); + + for (int i = 0; i < m; i++) { + for (int j = 0; j < n; j++) + row[j] = muAmplitude * (rnd.nextDouble() - 0.5); + roww.set(i); + w.append(roww, vw); + } + closeables.remove(w); + w.close(); + + FileSystem fs = FileSystem.get(conf); + + Path tempDirPath = new Path(fs.getWorkingDirectory(), "svd-proc"); + Path aPath = new Path(tempDirPath, "A/A.seq"); + fs.copyFromLocalFile(aLocPath, aPath); + + Path svdOutPath = new Path(tempDirPath, "SSVD-out"); + + // make sure we wipe out previous test results, just a convenience + fs.delete(svdOutPath, true); + + SSVDSolver ssvd = new SSVDSolver(conf, new Path[] { aPath }, svdOutPath, + ablockRows, k, p, 3); + // ssvd.setcUHalfSigma(true); + // ssvd.setcVHalfSigma(true); + ssvd.run(); + + double[] stochasticSValues = ssvd.getSingularValues(); + System.out.println("--SSVD solver singular values:"); + dumpSv(stochasticSValues); + System.out.println("--Colt SVD solver singular values:"); + + // try to run the same thing without stochastic algo + double[][] a = SSVDSolver.loadDistributedRowMatrix(fs, aPath, conf); + + // SingularValueDecompositionImpl svd=new SingularValueDecompositionImpl(new + // Array2DRowRealMatrix(a)); + SingularValueDecomposition svd2 = new SingularValueDecomposition( + new DenseMatrix(a)); + + a = null; + + double[] svalues2 = svd2.getSingularValues(); + dumpSv(svalues2); + + for (int i = 0; i < k + p; i++) + Assert + .assertTrue(Math.abs(svalues2[i] - stochasticSValues[i]) <= s_epsilon); + + double[][] q = SSVDSolver.loadDistributedRowMatrix(fs, new Path(svdOutPath, + "Bt-job/" + BtJob.OUTPUT_Q + "-*"), conf); + + SSVDPrototypeTest + .assertOrthonormality(new DenseMatrix(q), false, s_epsilon); + + double[][] u = SSVDSolver.loadDistributedRowMatrix(fs, new Path(svdOutPath, + "U/[^_]*"), conf); + + SSVDPrototypeTest + .assertOrthonormality(new DenseMatrix(u), false, s_epsilon); + double[][] v = SSVDSolver.loadDistributedRowMatrix(fs, new Path(svdOutPath, + "V/[^_]*"), conf); + + SSVDPrototypeTest + .assertOrthonormality(new DenseMatrix(v), false, s_epsilon); + } + + static void dumpSv(double[] s) { + System.out.printf("svs: "); + for (int i = 0; i < s.length; i++) + System.out.printf("%f ", s[i]); + System.out.println(); + + } + + static void dump(double[][] matrix) { + for (int i = 0; i < matrix.length; i++) { + for (int j = 0; j < matrix[i].length; j++) + System.out.printf("%f ", matrix[i][j]); + System.out.println(); + } + } + +} Added: mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDPrototypeTest.java URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDPrototypeTest.java?rev=1078694&view=auto ============================================================================== --- mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDPrototypeTest.java (added) +++ mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDPrototypeTest.java Mon Mar 7 06:34:12 2011 @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.hadoop.stochasticsvd; + +import java.util.Random; + +import junit.framework.Assert; + +import org.apache.mahout.common.MahoutTestCase; +import org.apache.mahout.math.DenseMatrix; +import org.apache.mahout.math.Matrix; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.function.DoubleFunction; +import org.junit.Test; + +/** + * Tests parts of of Stochastic SVD solver code in local mode + * using "prototype" code (class that simulates processes + * actually happenning in the MR jobs). + * + * + */ +public class SSVDPrototypeTest extends MahoutTestCase { + + private static double s_scale = 1000; + private static double s_epsilon = 1e-10; + + public void testSSVDPrototype() throws Exception { + SSVDPrototype.main(null); + } + + @Test + public void testGivensQR() throws Exception { + // DenseMatrix m = new DenseMatrix(dims<<2,dims); + DenseMatrix m = new DenseMatrix(3, 3); + m.assign(new DoubleFunction() { + + Random m_rnd = new Random(); + + @Override + public double apply(double arg0) { + return m_rnd.nextDouble() * s_scale; + } + }); + + m.setQuick(0, 0, 1); + m.setQuick(0, 1, 2); + m.setQuick(0, 2, 3); + m.setQuick(1, 0, 4); + m.setQuick(1, 1, 5); + m.setQuick(1, 2, 6); + m.setQuick(2, 0, 7); + m.setQuick(2, 1, 8); + m.setQuick(2, 2, 9); + + GivensThinSolver qrSolver = new GivensThinSolver(m.rowSize(), + m.columnSize()); + qrSolver.solve(m); + + Matrix qtm = new DenseMatrix(qrSolver.getThinQtTilde()); + + assertOrthonormality(qtm.transpose(), false, s_epsilon); + + Matrix aClone = new DenseMatrix(qrSolver.getThinQtTilde()).transpose() + .times(qrSolver.getRTilde()); + + System.out.println("aclone : " + aClone); + + } + + public static void assertOrthonormality(Matrix mtx, boolean insufficientRank, + double epsilon) { + int n = mtx.columnSize(); + int rank = 0; + for (int i = 0; i < n; i++) { + Vector e_i = mtx.getColumn(i); + + double norm = e_i.norm(2); + + if (Math.abs(1 - norm) < epsilon) + rank++; + else + Assert.assertTrue(Math.abs(norm) < epsilon); + + for (int j = 0; j <= i; j++) { + Vector e_j = mtx.getColumn(j); + double dot = e_i.dot(e_j); + Assert + .assertTrue(Math.abs((i == j && rank > j ? 1 : 0) - dot) < epsilon); + } + } + Assert.assertTrue((!insufficientRank && rank == n) + || (insufficientRank && rank < n)); + + } + +} Modified: mahout/trunk/math/pom.xml URL: http://svn.apache.org/viewvc/mahout/trunk/math/pom.xml?rev=1078694&r1=1078693&r2=1078694&view=diff ============================================================================== --- mahout/trunk/math/pom.xml (original) +++ mahout/trunk/math/pom.xml Mon Mar 7 06:34:12 2011 @@ -95,7 +95,7 @@ <groupId>org.apache.commons</groupId> <artifactId>commons-math</artifactId> <version>2.1</version> - <scope>test</scope> +<!-- <scope>test</scope>--> </dependency> <dependency> Added: mahout/trunk/math/src/main/java/org/apache/mahout/math/ssvd/EigenSolverWrapper.java URL: http://svn.apache.org/viewvc/mahout/trunk/math/src/main/java/org/apache/mahout/math/ssvd/EigenSolverWrapper.java?rev=1078694&view=auto ============================================================================== --- mahout/trunk/math/src/main/java/org/apache/mahout/math/ssvd/EigenSolverWrapper.java (added) +++ mahout/trunk/math/src/main/java/org/apache/mahout/math/ssvd/EigenSolverWrapper.java Mon Mar 7 06:34:12 2011 @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.mahout.math.ssvd; + +import org.apache.commons.math.linear.Array2DRowRealMatrix; +import org.apache.commons.math.linear.EigenDecompositionImpl; +import org.apache.commons.math.linear.RealMatrix; + +/** + * wraps appropriate eigen solver for BBt matrix. + * Can be either colt or apache commons math. <P> + * + * At the moment it is apache commons math which + * is only in mahout-math dependencies. <P> + * + * I will be happy to switch this to Colt eigensolver + * if it is proven reliable (i experience internal errors + * and unsorted singular values at some point). + * + * But for now commons-math seems to be more reliable. + * + * + * @author Dmitriy + * + */ +public class EigenSolverWrapper { + + private double[] m_eigenvalues; + private double[][] m_uHat; + + public EigenSolverWrapper(double[][] bbt ) { + super(); + int dim=bbt.length; + EigenDecompositionImpl evd2=new EigenDecompositionImpl(new Array2DRowRealMatrix(bbt),0); + m_eigenvalues=evd2.getRealEigenvalues(); + RealMatrix uHatrm= evd2.getV(); + m_uHat = new double[dim][]; + for ( int i = 0; i < dim; i++ ) m_uHat [i]=uHatrm.getRow(i); + } + + public double[][] getUHat() { return m_uHat; } + public double[] getEigenValues() { return m_eigenvalues; } + + + + +} Modified: mahout/trunk/src/conf/driver.classes.props URL: http://svn.apache.org/viewvc/mahout/trunk/src/conf/driver.classes.props?rev=1078694&r1=1078693&r2=1078694&view=diff ============================================================================== --- mahout/trunk/src/conf/driver.classes.props (original) +++ mahout/trunk/src/conf/driver.classes.props Mon Mar 7 06:34:12 2011 @@ -30,5 +30,6 @@ org.apache.mahout.classifier.sgd.RunLogi org.apache.mahout.classifier.sgd.PrintResourceOrFile = cat : Print a file or resource as the logistic regression models would see it org.apache.mahout.classifier.bayes.WikipediaXmlSplitter = wikipediaXMLSplitter : Reads wikipedia data and creates ch org.apache.mahout.classifier.bayes.WikipediaDatasetCreatorDriver = wikipediaDataSetCreator : Splits data set of wikipedia wrt feature like country +org.apache.mahout.math.hadoop.stochasticsvd.SSVDCli = ssvd : Stochastic SVD org.apache.mahout.clustering.spectral.eigencuts.EigencutsDriver = eigencuts : Eigencuts spectral clustering org.apache.mahout.clustering.spectral.kmeans.SpectralKMeansDriver = spectralkmeans : Spectral k-means clustering Added: mahout/trunk/src/conf/ssvd.props URL: http://svn.apache.org/viewvc/mahout/trunk/src/conf/ssvd.props?rev=1078694&view=auto ============================================================================== --- mahout/trunk/src/conf/ssvd.props (added) +++ mahout/trunk/src/conf/ssvd.props Mon Mar 7 06:34:12 2011 @@ -0,0 +1,13 @@ +#i|input = +#o|output = +#k|rank = +#t|tempDir = +#p|oversampling = +#r|blockHeight = +#s|minSplitSize = +#U|computeU = +#uhs|uHalfSigma = +#V|computeV = +#vhs|vHalfSigma = +#t|reduceTasks = +#w|wide =
