http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/math/MatrixUtils.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/math/MatrixUtils.java b/mr/src/main/java/org/apache/mahout/math/MatrixUtils.java new file mode 100644 index 0000000..f9ca52e --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/math/MatrixUtils.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.mahout.math; + +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; +import org.apache.mahout.common.Pair; +import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable; +import org.apache.mahout.math.map.OpenObjectIntHashMap; + +import java.io.IOException; +import java.util.List; + +public final class MatrixUtils { + + private MatrixUtils() { + } + + public static void write(Path outputDir, Configuration conf, VectorIterable matrix) + throws IOException { + FileSystem fs = outputDir.getFileSystem(conf); + fs.delete(outputDir, true); + SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, outputDir, + IntWritable.class, VectorWritable.class); + IntWritable topic = new IntWritable(); + VectorWritable vector = new VectorWritable(); + for (MatrixSlice slice : matrix) { + topic.set(slice.index()); + vector.set(slice.vector()); + writer.append(topic, vector); + } + writer.close(); + } + + public static Matrix read(Configuration conf, Path... modelPaths) throws IOException { + int numRows = -1; + int numCols = -1; + boolean sparse = false; + List<Pair<Integer, Vector>> rows = Lists.newArrayList(); + for (Path modelPath : modelPaths) { + for (Pair<IntWritable, VectorWritable> row + : new SequenceFileIterable<IntWritable, VectorWritable>(modelPath, true, conf)) { + rows.add(Pair.of(row.getFirst().get(), row.getSecond().get())); + numRows = Math.max(numRows, row.getFirst().get()); + sparse = !row.getSecond().get().isDense(); + if (numCols < 0) { + numCols = row.getSecond().get().size(); + } + } + } + if (rows.isEmpty()) { + throw new IOException(Arrays.toString(modelPaths) + " have no vectors in it"); + } + numRows++; + Vector[] arrayOfRows = new Vector[numRows]; + for (Pair<Integer, Vector> pair : rows) { + arrayOfRows[pair.getFirst()] = pair.getSecond(); + } + Matrix matrix; + if (sparse) { + matrix = new SparseRowMatrix(numRows, numCols, arrayOfRows); + } else { + matrix = new DenseMatrix(numRows, numCols); + for (int i = 0; i < numRows; i++) { + matrix.assignRow(i, arrayOfRows[i]); + } + } + return matrix; + } + + public static OpenObjectIntHashMap<String> readDictionary(Configuration conf, Path... dictPath) { + OpenObjectIntHashMap<String> dictionary = new OpenObjectIntHashMap<>(); + for (Path dictionaryFile : dictPath) { + for (Pair<Writable, IntWritable> record + : new SequenceFileIterable<Writable, IntWritable>(dictionaryFile, true, conf)) { + dictionary.put(record.getFirst().toString(), record.getSecond().get()); + } + } + return dictionary; + } + + public static String[] invertDictionary(OpenObjectIntHashMap<String> termIdMap) { + int maxTermId = -1; + for (String term : termIdMap.keys()) { + maxTermId = Math.max(maxTermId, termIdMap.get(term)); + } + maxTermId++; + String[] dictionary = new String[maxTermId]; + for (String term : termIdMap.keys()) { + dictionary[termIdMap.get(term)] = term; + } + return dictionary; + } + +}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/math/MultiLabelVectorWritable.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/math/MultiLabelVectorWritable.java b/mr/src/main/java/org/apache/mahout/math/MultiLabelVectorWritable.java new file mode 100644 index 0000000..0c45c9a --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/math/MultiLabelVectorWritable.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math; + +import org.apache.hadoop.io.Writable; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * Writable to handle serialization of a vector and a variable list of + * associated label indexes. + */ +public final class MultiLabelVectorWritable implements Writable { + + private final VectorWritable vectorWritable = new VectorWritable(); + private int[] labels; + + public MultiLabelVectorWritable() { + } + + public MultiLabelVectorWritable(Vector vector, int[] labels) { + this.vectorWritable.set(vector); + this.labels = labels; + } + + public Vector getVector() { + return vectorWritable.get(); + } + + public void setVector(Vector vector) { + vectorWritable.set(vector); + } + + public void setLabels(int[] labels) { + this.labels = labels; + } + + public int[] getLabels() { + return labels; + } + + @Override + public void readFields(DataInput in) throws IOException { + vectorWritable.readFields(in); + int labelSize = in.readInt(); + labels = new int[labelSize]; + for (int i = 0; i < labelSize; i++) { + labels[i] = in.readInt(); + } + } + + @Override + public void write(DataOutput out) throws IOException { + vectorWritable.write(out); + out.writeInt(labels.length); + for (int label : labels) { + out.writeInt(label); + } + } + + public static MultiLabelVectorWritable read(DataInput in) throws IOException { + MultiLabelVectorWritable writable = new MultiLabelVectorWritable(); + writable.readFields(in); + return writable; + } + + public static void write(DataOutput out, SequentialAccessSparseVector ssv, int[] labels) throws IOException { + new MultiLabelVectorWritable(ssv, labels).write(out); + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java b/mr/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java new file mode 100644 index 0000000..1a6ff16 --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java @@ -0,0 +1,385 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.hadoop; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Iterator; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.Job; +import org.apache.mahout.common.Pair; +import org.apache.mahout.common.iterator.sequencefile.PathFilters; +import org.apache.mahout.common.iterator.sequencefile.PathType; +import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterator; +import org.apache.mahout.math.CardinalityException; +import org.apache.mahout.math.MatrixSlice; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorIterable; +import org.apache.mahout.math.VectorWritable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Function; +import com.google.common.collect.Iterators; + +/** + * DistributedRowMatrix is a FileSystem-backed VectorIterable in which the vectors live in a + * SequenceFile<WritableComparable,VectorWritable>, and distributed operations are executed as M/R passes on + * Hadoop. The usage is as follows: <p> + * <p> + * <pre> + * // the path must already contain an already created SequenceFile! + * DistributedRowMatrix m = new DistributedRowMatrix("path/to/vector/sequenceFile", "tmp/path", 10000000, 250000); + * m.setConf(new Configuration()); + * // now if we want to multiply a vector by this matrix, it's dimension must equal the row dimension of this + * // matrix. If we want to timesSquared() a vector by this matrix, its dimension must equal the column dimension + * // of the matrix. + * Vector v = new DenseVector(250000); + * // now the following operation will be done via a M/R pass via Hadoop. + * Vector w = m.timesSquared(v); + * </pre> + * + */ +public class DistributedRowMatrix implements VectorIterable, Configurable { + public static final String KEEP_TEMP_FILES = "DistributedMatrix.keep.temp.files"; + + private static final Logger log = LoggerFactory.getLogger(DistributedRowMatrix.class); + + private final Path inputPath; + private final Path outputTmpPath; + private Configuration conf; + private Path rowPath; + private Path outputTmpBasePath; + private final int numRows; + private final int numCols; + private boolean keepTempFiles; + + public DistributedRowMatrix(Path inputPath, + Path outputTmpPath, + int numRows, + int numCols) { + this(inputPath, outputTmpPath, numRows, numCols, false); + } + + public DistributedRowMatrix(Path inputPath, + Path outputTmpPath, + int numRows, + int numCols, + boolean keepTempFiles) { + this.inputPath = inputPath; + this.outputTmpPath = outputTmpPath; + this.numRows = numRows; + this.numCols = numCols; + this.keepTempFiles = keepTempFiles; + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + try { + FileSystem fs = FileSystem.get(inputPath.toUri(), conf); + rowPath = fs.makeQualified(inputPath); + outputTmpBasePath = fs.makeQualified(outputTmpPath); + keepTempFiles = conf.getBoolean(KEEP_TEMP_FILES, false); + } catch (IOException ioe) { + throw new IllegalStateException(ioe); + } + } + + public Path getRowPath() { + return rowPath; + } + + public Path getOutputTempPath() { + return outputTmpBasePath; + } + + public void setOutputTempPathString(String outPathString) { + try { + outputTmpBasePath = FileSystem.get(conf).makeQualified(new Path(outPathString)); + } catch (IOException ioe) { + log.warn("Unable to set outputBasePath to {}, leaving as {}", + outPathString, outputTmpBasePath); + } + } + + @Override + public Iterator<MatrixSlice> iterateAll() { + try { + Path pathPattern = rowPath; + if (FileSystem.get(conf).getFileStatus(rowPath).isDir()) { + pathPattern = new Path(rowPath, "*"); + } + return Iterators.transform( + new SequenceFileDirIterator<IntWritable,VectorWritable>(pathPattern, + PathType.GLOB, + PathFilters.logsCRCFilter(), + null, + true, + conf), + new Function<Pair<IntWritable,VectorWritable>,MatrixSlice>() { + @Override + public MatrixSlice apply(Pair<IntWritable, VectorWritable> from) { + return new MatrixSlice(from.getSecond().get(), from.getFirst().get()); + } + }); + } catch (IOException ioe) { + throw new IllegalStateException(ioe); + } + } + + @Override + public int numSlices() { + return numRows(); + } + + @Override + public int numRows() { + return numRows; + } + + @Override + public int numCols() { + return numCols; + } + + + /** + * This implements matrix this.transpose().times(other) + * @param other a DistributedRowMatrix + * @return a DistributedRowMatrix containing the product + */ + public DistributedRowMatrix times(DistributedRowMatrix other) throws IOException { + return times(other, new Path(outputTmpBasePath.getParent(), "productWith-" + (System.nanoTime() & 0xFF))); + } + + /** + * This implements matrix this.transpose().times(other) + * @param other a DistributedRowMatrix + * @param outPath path to write result to + * @return a DistributedRowMatrix containing the product + */ + public DistributedRowMatrix times(DistributedRowMatrix other, Path outPath) throws IOException { + if (numRows != other.numRows()) { + throw new CardinalityException(numRows, other.numRows()); + } + + Configuration initialConf = getConf() == null ? new Configuration() : getConf(); + Configuration conf = + MatrixMultiplicationJob.createMatrixMultiplyJobConf(initialConf, + rowPath, + other.rowPath, + outPath, + other.numCols); + JobClient.runJob(new JobConf(conf)); + DistributedRowMatrix out = new DistributedRowMatrix(outPath, outputTmpPath, numCols, other.numCols()); + out.setConf(conf); + return out; + } + + public Vector columnMeans() throws IOException { + return columnMeans("SequentialAccessSparseVector"); + } + + /** + * Returns the column-wise mean of a DistributedRowMatrix + * + * @param vectorClass + * desired class for the column-wise mean vector e.g. + * RandomAccessSparseVector, DenseVector + * @return Vector containing the column-wise mean of this + */ + public Vector columnMeans(String vectorClass) throws IOException { + Path outputVectorTmpPath = + new Path(outputTmpBasePath, new Path(Long.toString(System.nanoTime()))); + Configuration initialConf = + getConf() == null ? new Configuration() : getConf(); + String vectorClassFull = "org.apache.mahout.math." + vectorClass; + Vector mean = MatrixColumnMeansJob.run(initialConf, rowPath, outputVectorTmpPath, vectorClassFull); + if (!keepTempFiles) { + FileSystem fs = outputVectorTmpPath.getFileSystem(conf); + fs.delete(outputVectorTmpPath, true); + } + return mean; + } + + public DistributedRowMatrix transpose() throws IOException { + Path outputPath = new Path(rowPath.getParent(), "transpose-" + (System.nanoTime() & 0xFF)); + Configuration initialConf = getConf() == null ? new Configuration() : getConf(); + Job transposeJob = TransposeJob.buildTransposeJob(initialConf, rowPath, outputPath, numRows); + + try { + transposeJob.waitForCompletion(true); + } catch (Exception e) { + throw new IllegalStateException("transposition failed", e); + } + + DistributedRowMatrix m = new DistributedRowMatrix(outputPath, outputTmpPath, numCols, numRows); + m.setConf(this.conf); + return m; + } + + @Override + public Vector times(Vector v) { + try { + Configuration initialConf = getConf() == null ? new Configuration() : getConf(); + Path outputVectorTmpPath = new Path(outputTmpBasePath, new Path(Long.toString(System.nanoTime()))); + + Job job = TimesSquaredJob.createTimesJob(initialConf, v, numRows, rowPath, outputVectorTmpPath); + + try { + job.waitForCompletion(true); + } catch (Exception e) { + throw new IllegalStateException("times failed", e); + } + + Vector result = TimesSquaredJob.retrieveTimesSquaredOutputVector(outputVectorTmpPath, conf); + if (!keepTempFiles) { + FileSystem fs = outputVectorTmpPath.getFileSystem(conf); + fs.delete(outputVectorTmpPath, true); + } + return result; + } catch (IOException ioe) { + throw new IllegalStateException(ioe); + } + } + + @Override + public Vector timesSquared(Vector v) { + try { + Configuration initialConf = getConf() == null ? new Configuration() : getConf(); + Path outputVectorTmpPath = new Path(outputTmpBasePath, new Path(Long.toString(System.nanoTime()))); + + Job job = TimesSquaredJob.createTimesSquaredJob(initialConf, v, rowPath, outputVectorTmpPath); + + try { + job.waitForCompletion(true); + } catch (Exception e) { + throw new IllegalStateException("timesSquared failed", e); + } + + Vector result = TimesSquaredJob.retrieveTimesSquaredOutputVector(outputVectorTmpPath, conf); + if (!keepTempFiles) { + FileSystem fs = outputVectorTmpPath.getFileSystem(conf); + fs.delete(outputVectorTmpPath, true); + } + return result; + } catch (IOException ioe) { + throw new IllegalStateException(ioe); + } + } + + @Override + public Iterator<MatrixSlice> iterator() { + return iterateAll(); + } + + public static class MatrixEntryWritable implements WritableComparable<MatrixEntryWritable> { + private int row; + private int col; + private double val; + + public int getRow() { + return row; + } + + public void setRow(int row) { + this.row = row; + } + + public int getCol() { + return col; + } + + public void setCol(int col) { + this.col = col; + } + + public double getVal() { + return val; + } + + public void setVal(double val) { + this.val = val; + } + + @Override + public int compareTo(MatrixEntryWritable o) { + if (row > o.row) { + return 1; + } else if (row < o.row) { + return -1; + } else { + if (col > o.col) { + return 1; + } else if (col < o.col) { + return -1; + } else { + return 0; + } + } + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof MatrixEntryWritable)) { + return false; + } + MatrixEntryWritable other = (MatrixEntryWritable) o; + return row == other.row && col == other.col; + } + + @Override + public int hashCode() { + return row + 31 * col; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(row); + out.writeInt(col); + out.writeDouble(val); + } + + @Override + public void readFields(DataInput in) throws IOException { + row = in.readInt(); + col = in.readInt(); + val = in.readDouble(); + } + + @Override + public String toString() { + return "(" + row + ',' + col + "):" + val; + } + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/math/hadoop/MatrixColumnMeansJob.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/math/hadoop/MatrixColumnMeansJob.java b/mr/src/main/java/org/apache/mahout/math/hadoop/MatrixColumnMeansJob.java new file mode 100644 index 0000000..b4f459a --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/math/hadoop/MatrixColumnMeansJob.java @@ -0,0 +1,236 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.mahout.math.hadoop; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.mahout.common.ClassUtils; +import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterator; +import org.apache.mahout.math.DenseVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; +import org.apache.mahout.math.function.Functions; + +import com.google.common.io.Closeables; + +/** + * MatrixColumnMeansJob is a job for calculating the column-wise mean of a + * DistributedRowMatrix. This job can be accessed using + * DistributedRowMatrix.columnMeans() + */ +public final class MatrixColumnMeansJob { + + public static final String VECTOR_CLASS = + "DistributedRowMatrix.columnMeans.vector.class"; + + private MatrixColumnMeansJob() { + } + + public static Vector run(Configuration conf, + Path inputPath, + Path outputVectorTmpPath) throws IOException { + return run(conf, inputPath, outputVectorTmpPath, null); + } + + /** + * Job for calculating column-wise mean of a DistributedRowMatrix + * + * @param initialConf + * @param inputPath + * path to DistributedRowMatrix input + * @param outputVectorTmpPath + * path for temporary files created during job + * @param vectorClass + * String of desired class for returned vector e.g. DenseVector, + * RandomAccessSparseVector (may be null for {@link DenseVector} ) + * @return Vector containing column-wise mean of DistributedRowMatrix + */ + public static Vector run(Configuration initialConf, + Path inputPath, + Path outputVectorTmpPath, + String vectorClass) throws IOException { + + try { + initialConf.set(VECTOR_CLASS, + vectorClass == null ? DenseVector.class.getName() + : vectorClass); + + Job job = new Job(initialConf, "MatrixColumnMeansJob"); + job.setJarByClass(MatrixColumnMeansJob.class); + + FileOutputFormat.setOutputPath(job, outputVectorTmpPath); + + outputVectorTmpPath.getFileSystem(job.getConfiguration()) + .delete(outputVectorTmpPath, true); + job.setNumReduceTasks(1); + FileOutputFormat.setOutputPath(job, outputVectorTmpPath); + FileInputFormat.addInputPath(job, inputPath); + job.setInputFormatClass(SequenceFileInputFormat.class); + job.setOutputFormatClass(SequenceFileOutputFormat.class); + FileOutputFormat.setOutputPath(job, outputVectorTmpPath); + + job.setMapperClass(MatrixColumnMeansMapper.class); + job.setReducerClass(MatrixColumnMeansReducer.class); + job.setMapOutputKeyClass(NullWritable.class); + job.setMapOutputValueClass(VectorWritable.class); + job.setOutputKeyClass(IntWritable.class); + job.setOutputValueClass(VectorWritable.class); + job.submit(); + job.waitForCompletion(true); + + Path tmpFile = new Path(outputVectorTmpPath, "part-r-00000"); + SequenceFileValueIterator<VectorWritable> iterator = + new SequenceFileValueIterator<>(tmpFile, true, initialConf); + try { + if (iterator.hasNext()) { + return iterator.next().get(); + } else { + return (Vector) Class.forName(vectorClass).getConstructor(int.class) + .newInstance(0); + } + } finally { + Closeables.close(iterator, true); + } + } catch (IOException ioe) { + throw ioe; + } catch (Throwable thr) { + throw new IOException(thr); + } + } + + /** + * Mapper for calculation of column-wise mean. + */ + public static class MatrixColumnMeansMapper extends + Mapper<Writable, VectorWritable, NullWritable, VectorWritable> { + + private Vector runningSum; + private String vectorClass; + + @Override + public void setup(Context context) { + vectorClass = context.getConfiguration().get(VECTOR_CLASS); + } + + /** + * The mapper computes a running sum of the vectors the task has seen. + * Element 0 of the running sum vector contains a count of the number of + * vectors that have been seen. The remaining elements contain the + * column-wise running sum. Nothing is written at this stage + */ + @Override + public void map(Writable r, VectorWritable v, Context context) + throws IOException { + if (runningSum == null) { + /* + * If this is the first vector the mapper has seen, instantiate a new + * vector using the parameter VECTOR_CLASS + */ + runningSum = ClassUtils.instantiateAs(vectorClass, + Vector.class, + new Class<?>[] { int.class }, + new Object[] { v.get().size() + 1 }); + runningSum.set(0, 1); + runningSum.viewPart(1, v.get().size()).assign(v.get()); + } else { + runningSum.set(0, runningSum.get(0) + 1); + runningSum.viewPart(1, v.get().size()).assign(v.get(), Functions.PLUS); + } + } + + /** + * The column-wise sum is written at the cleanup stage. A single reducer is + * forced so null can be used for the key + */ + @Override + public void cleanup(Context context) throws InterruptedException, + IOException { + if (runningSum != null) { + context.write(NullWritable.get(), new VectorWritable(runningSum)); + } + } + + } + + /** + * The reducer adds the partial column-wise sums from each of the mappers to + * compute the total column-wise sum. The total sum is then divided by the + * total count of vectors to determine the column-wise mean. + */ + public static class MatrixColumnMeansReducer extends + Reducer<NullWritable, VectorWritable, IntWritable, VectorWritable> { + + private static final IntWritable ONE = new IntWritable(1); + + private String vectorClass; + private Vector outputVector; + private final VectorWritable outputVectorWritable = new VectorWritable(); + + @Override + public void setup(Context context) { + vectorClass = context.getConfiguration().get(VECTOR_CLASS); + } + + @Override + public void reduce(NullWritable n, + Iterable<VectorWritable> vectors, + Context context) throws IOException, InterruptedException { + + /** + * Add together partial column-wise sums from mappers + */ + for (VectorWritable v : vectors) { + if (outputVector == null) { + outputVector = v.get(); + } else { + outputVector.assign(v.get(), Functions.PLUS); + } + } + + /** + * Divide total column-wise sum by count of vectors, which corresponds to + * the number of rows in the DistributedRowMatrix + */ + if (outputVector != null) { + outputVectorWritable.set(outputVector.viewPart(1, + outputVector.size() - 1) + .divide(outputVector.get(0))); + context.write(ONE, outputVectorWritable); + } else { + Vector emptyVector = ClassUtils.instantiateAs(vectorClass, + Vector.class, + new Class<?>[] { int.class }, + new Object[] { 0 }); + context.write(ONE, new VectorWritable(emptyVector)); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/math/hadoop/MatrixMultiplicationJob.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/math/hadoop/MatrixMultiplicationJob.java b/mr/src/main/java/org/apache/mahout/math/hadoop/MatrixMultiplicationJob.java new file mode 100644 index 0000000..48eda08 --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/math/hadoop/MatrixMultiplicationJob.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.hadoop; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.SequenceFileInputFormat; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.hadoop.mapred.join.CompositeInputFormat; +import org.apache.hadoop.mapred.join.TupleWritable; +import org.apache.hadoop.util.ToolRunner; +import org.apache.mahout.common.AbstractJob; +import org.apache.mahout.math.RandomAccessSparseVector; +import org.apache.mahout.math.SequentialAccessSparseVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; +import org.apache.mahout.math.function.Functions; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * This still uses the old MR api and as with all things in Mahout that are MapReduce is now part of 'mahout-mr'. + * There is no plan to convert the old MR api used here to the new MR api. + * This will be replaced by the new Spark based Linear Algebra bindings. + */ + +public class MatrixMultiplicationJob extends AbstractJob { + + private static final String OUT_CARD = "output.vector.cardinality"; + + public static Configuration createMatrixMultiplyJobConf(Path aPath, + Path bPath, + Path outPath, + int outCardinality) { + return createMatrixMultiplyJobConf(new Configuration(), aPath, bPath, outPath, outCardinality); + } + + public static Configuration createMatrixMultiplyJobConf(Configuration initialConf, + Path aPath, + Path bPath, + Path outPath, + int outCardinality) { + JobConf conf = new JobConf(initialConf, MatrixMultiplicationJob.class); + conf.setInputFormat(CompositeInputFormat.class); + conf.set("mapred.join.expr", CompositeInputFormat.compose( + "inner", SequenceFileInputFormat.class, aPath, bPath)); + conf.setInt(OUT_CARD, outCardinality); + conf.setOutputFormat(SequenceFileOutputFormat.class); + FileOutputFormat.setOutputPath(conf, outPath); + conf.setMapperClass(MatrixMultiplyMapper.class); + conf.setCombinerClass(MatrixMultiplicationReducer.class); + conf.setReducerClass(MatrixMultiplicationReducer.class); + conf.setMapOutputKeyClass(IntWritable.class); + conf.setMapOutputValueClass(VectorWritable.class); + conf.setOutputKeyClass(IntWritable.class); + conf.setOutputValueClass(VectorWritable.class); + return conf; + } + + public static void main(String[] args) throws Exception { + ToolRunner.run(new MatrixMultiplicationJob(), args); + } + + @Override + public int run(String[] strings) throws Exception { + addOption("numRowsA", "nra", "Number of rows of the first input matrix", true); + addOption("numColsA", "nca", "Number of columns of the first input matrix", true); + addOption("numRowsB", "nrb", "Number of rows of the second input matrix", true); + + addOption("numColsB", "ncb", "Number of columns of the second input matrix", true); + addOption("inputPathA", "ia", "Path to the first input matrix", true); + addOption("inputPathB", "ib", "Path to the second input matrix", true); + + addOption("outputPath", "op", "Path to the output matrix", false); + + Map<String, List<String>> argMap = parseArguments(strings); + if (argMap == null) { + return -1; + } + + DistributedRowMatrix a = new DistributedRowMatrix(new Path(getOption("inputPathA")), + new Path(getOption("tempDir")), + Integer.parseInt(getOption("numRowsA")), + Integer.parseInt(getOption("numColsA"))); + DistributedRowMatrix b = new DistributedRowMatrix(new Path(getOption("inputPathB")), + new Path(getOption("tempDir")), + Integer.parseInt(getOption("numRowsB")), + Integer.parseInt(getOption("numColsB"))); + + a.setConf(new Configuration(getConf())); + b.setConf(new Configuration(getConf())); + + if (hasOption("outputPath")) { + a.times(b, new Path(getOption("outputPath"))); + } else { + a.times(b); + } + + return 0; + } + + public static class MatrixMultiplyMapper extends MapReduceBase + implements Mapper<IntWritable,TupleWritable,IntWritable,VectorWritable> { + + private int outCardinality; + private final IntWritable row = new IntWritable(); + + @Override + public void configure(JobConf conf) { + outCardinality = conf.getInt(OUT_CARD, Integer.MAX_VALUE); + } + + @Override + public void map(IntWritable index, + TupleWritable v, + OutputCollector<IntWritable,VectorWritable> out, + Reporter reporter) throws IOException { + boolean firstIsOutFrag = ((VectorWritable)v.get(0)).get().size() == outCardinality; + Vector outFrag = firstIsOutFrag ? ((VectorWritable)v.get(0)).get() : ((VectorWritable)v.get(1)).get(); + Vector multiplier = firstIsOutFrag ? ((VectorWritable)v.get(1)).get() : ((VectorWritable)v.get(0)).get(); + + VectorWritable outVector = new VectorWritable(); + for (Vector.Element e : multiplier.nonZeroes()) { + row.set(e.index()); + outVector.set(outFrag.times(e.get())); + out.collect(row, outVector); + } + } + } + + public static class MatrixMultiplicationReducer extends MapReduceBase + implements Reducer<IntWritable,VectorWritable,IntWritable,VectorWritable> { + + @Override + public void reduce(IntWritable rowNum, + Iterator<VectorWritable> it, + OutputCollector<IntWritable,VectorWritable> out, + Reporter reporter) throws IOException { + if (!it.hasNext()) { + return; + } + Vector accumulator = new RandomAccessSparseVector(it.next().get()); + while (it.hasNext()) { + Vector row = it.next().get(); + accumulator.assign(row, Functions.PLUS); + } + out.collect(rowNum, new VectorWritable(new SequentialAccessSparseVector(accumulator))); + } + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java b/mr/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java new file mode 100644 index 0000000..e234eb9 --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java @@ -0,0 +1,251 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.hadoop; + +import com.google.common.io.Closeables; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.filecache.DistributedCache; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.mahout.common.HadoopUtil; +import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterator; +import org.apache.mahout.math.DenseVector; +import org.apache.mahout.math.RandomAccessSparseVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; +import org.apache.mahout.math.function.Functions; + +import com.google.common.base.Preconditions; + +import java.io.IOException; +import java.net.URI; + +public final class TimesSquaredJob { + + public static final String INPUT_VECTOR = "DistributedMatrix.times.inputVector"; + public static final String IS_SPARSE_OUTPUT = "DistributedMatrix.times.outputVector.sparse"; + public static final String OUTPUT_VECTOR_DIMENSION = "DistributedMatrix.times.output.dimension"; + + public static final String OUTPUT_VECTOR_FILENAME = "DistributedMatrix.times.outputVector"; + + private TimesSquaredJob() { } + + public static Job createTimesSquaredJob(Vector v, Path matrixInputPath, Path outputVectorPath) + throws IOException { + return createTimesSquaredJob(new Configuration(), v, matrixInputPath, outputVectorPath); + } + + public static Job createTimesSquaredJob(Configuration initialConf, Vector v, Path matrixInputPath, + Path outputVectorPath) throws IOException { + + return createTimesSquaredJob(initialConf, v, matrixInputPath, outputVectorPath, TimesSquaredMapper.class, + VectorSummingReducer.class); + } + + public static Job createTimesJob(Vector v, int outDim, Path matrixInputPath, Path outputVectorPath) + throws IOException { + + return createTimesJob(new Configuration(), v, outDim, matrixInputPath, outputVectorPath); + } + + public static Job createTimesJob(Configuration initialConf, Vector v, int outDim, Path matrixInputPath, + Path outputVectorPath) throws IOException { + + return createTimesSquaredJob(initialConf, v, outDim, matrixInputPath, outputVectorPath, TimesMapper.class, + VectorSummingReducer.class); + } + + public static Job createTimesSquaredJob(Vector v, Path matrixInputPath, Path outputVectorPathBase, + Class<? extends TimesSquaredMapper> mapClass, Class<? extends VectorSummingReducer> redClass) throws IOException { + + return createTimesSquaredJob(new Configuration(), v, matrixInputPath, outputVectorPathBase, mapClass, redClass); + } + + public static Job createTimesSquaredJob(Configuration initialConf, Vector v, Path matrixInputPath, + Path outputVectorPathBase, Class<? extends TimesSquaredMapper> mapClass, + Class<? extends VectorSummingReducer> redClass) throws IOException { + + return createTimesSquaredJob(initialConf, v, v.size(), matrixInputPath, outputVectorPathBase, mapClass, redClass); + } + + public static Job createTimesSquaredJob(Vector v, int outputVectorDim, Path matrixInputPath, + Path outputVectorPathBase, Class<? extends TimesSquaredMapper> mapClass, + Class<? extends VectorSummingReducer> redClass) throws IOException { + + return createTimesSquaredJob(new Configuration(), v, outputVectorDim, matrixInputPath, outputVectorPathBase, + mapClass, redClass); + } + + public static Job createTimesSquaredJob(Configuration initialConf, Vector v, int outputVectorDim, + Path matrixInputPath, Path outputVectorPathBase, Class<? extends TimesSquaredMapper> mapClass, + Class<? extends VectorSummingReducer> redClass) throws IOException { + + FileSystem fs = FileSystem.get(matrixInputPath.toUri(), initialConf); + matrixInputPath = fs.makeQualified(matrixInputPath); + outputVectorPathBase = fs.makeQualified(outputVectorPathBase); + + long now = System.nanoTime(); + Path inputVectorPath = new Path(outputVectorPathBase, INPUT_VECTOR + '/' + now); + + + SequenceFile.Writer inputVectorPathWriter = null; + + try { + inputVectorPathWriter = new SequenceFile.Writer(fs, initialConf, inputVectorPath, NullWritable.class, + VectorWritable.class); + inputVectorPathWriter.append(NullWritable.get(), new VectorWritable(v)); + } finally { + Closeables.close(inputVectorPathWriter, false); + } + + URI ivpURI = inputVectorPath.toUri(); + DistributedCache.setCacheFiles(new URI[] { ivpURI }, initialConf); + + Job job = HadoopUtil.prepareJob(matrixInputPath, new Path(outputVectorPathBase, OUTPUT_VECTOR_FILENAME), + SequenceFileInputFormat.class, mapClass, NullWritable.class, VectorWritable.class, redClass, + NullWritable.class, VectorWritable.class, SequenceFileOutputFormat.class, initialConf); + job.setCombinerClass(redClass); + job.setJobName("TimesSquaredJob: " + matrixInputPath); + + Configuration conf = job.getConfiguration(); + conf.set(INPUT_VECTOR, ivpURI.toString()); + conf.setBoolean(IS_SPARSE_OUTPUT, !v.isDense()); + conf.setInt(OUTPUT_VECTOR_DIMENSION, outputVectorDim); + + return job; + } + + public static Vector retrieveTimesSquaredOutputVector(Path outputVectorTmpPath, Configuration conf) + throws IOException { + Path outputFile = new Path(outputVectorTmpPath, OUTPUT_VECTOR_FILENAME + "/part-r-00000"); + SequenceFileValueIterator<VectorWritable> iterator = + new SequenceFileValueIterator<>(outputFile, true, conf); + try { + return iterator.next().get(); + } finally { + Closeables.close(iterator, true); + } + } + + public static class TimesSquaredMapper<T extends WritableComparable> + extends Mapper<T,VectorWritable, NullWritable,VectorWritable> { + + private Vector outputVector; + private Vector inputVector; + + Vector getOutputVector() { + return outputVector; + } + + @Override + protected void setup(Context ctx) throws IOException, InterruptedException { + try { + Configuration conf = ctx.getConfiguration(); + Path[] localFiles = DistributedCache.getLocalCacheFiles(conf); + Preconditions.checkArgument(localFiles != null && localFiles.length >= 1, + "missing paths from the DistributedCache"); + + Path inputVectorPath = HadoopUtil.getSingleCachedFile(conf); + + SequenceFileValueIterator<VectorWritable> iterator = + new SequenceFileValueIterator<>(inputVectorPath, true, conf); + try { + inputVector = iterator.next().get(); + } finally { + Closeables.close(iterator, true); + } + + int outDim = conf.getInt(OUTPUT_VECTOR_DIMENSION, Integer.MAX_VALUE); + outputVector = conf.getBoolean(IS_SPARSE_OUTPUT, false) + ? new RandomAccessSparseVector(outDim, 10) + : new DenseVector(outDim); + } catch (IOException ioe) { + throw new IllegalStateException(ioe); + } + } + + @Override + protected void map(T key, VectorWritable v, Context context) throws IOException, InterruptedException { + + double d = scale(v); + if (d == 1.0) { + outputVector.assign(v.get(), Functions.PLUS); + } else if (d != 0.0) { + outputVector.assign(v.get(), Functions.plusMult(d)); + } + } + + protected double scale(VectorWritable v) { + return v.get().dot(inputVector); + } + + @Override + protected void cleanup(Context ctx) throws IOException, InterruptedException { + ctx.write(NullWritable.get(), new VectorWritable(outputVector)); + } + + } + + public static class TimesMapper extends TimesSquaredMapper<IntWritable> { + + + @Override + protected void map(IntWritable rowNum, VectorWritable v, Context context) throws IOException, InterruptedException { + double d = scale(v); + if (d != 0.0) { + getOutputVector().setQuick(rowNum.get(), d); + } + } + } + + public static class VectorSummingReducer extends Reducer<NullWritable,VectorWritable,NullWritable,VectorWritable> { + + private Vector outputVector; + + @Override + protected void setup(Context ctx) throws IOException, InterruptedException { + Configuration conf = ctx.getConfiguration(); + int outputDimension = conf.getInt(OUTPUT_VECTOR_DIMENSION, Integer.MAX_VALUE); + outputVector = conf.getBoolean(IS_SPARSE_OUTPUT, false) + ? new RandomAccessSparseVector(outputDimension, 10) + : new DenseVector(outputDimension); + } + + @Override + protected void reduce(NullWritable key, Iterable<VectorWritable> vectors, Context ctx) + throws IOException, InterruptedException { + + for (VectorWritable v : vectors) { + if (v != null) { + outputVector.assign(v.get(), Functions.PLUS); + } + } + ctx.write(NullWritable.get(), new VectorWritable(outputVector)); + } + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/math/hadoop/TransposeJob.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/math/hadoop/TransposeJob.java b/mr/src/main/java/org/apache/mahout/math/hadoop/TransposeJob.java new file mode 100644 index 0000000..60066c6 --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/math/hadoop/TransposeJob.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.hadoop; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.hadoop.util.ToolRunner; +import org.apache.mahout.common.AbstractJob; +import org.apache.mahout.common.HadoopUtil; +import org.apache.mahout.common.mapreduce.MergeVectorsCombiner; +import org.apache.mahout.common.mapreduce.MergeVectorsReducer; +import org.apache.mahout.common.mapreduce.TransposeMapper; +import org.apache.mahout.math.VectorWritable; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** Transpose a matrix */ +public class TransposeJob extends AbstractJob { + + public static void main(String[] args) throws Exception { + ToolRunner.run(new TransposeJob(), args); + } + + @Override + public int run(String[] strings) throws Exception { + addInputOption(); + addOption("numRows", "nr", "Number of rows of the input matrix"); + addOption("numCols", "nc", "Number of columns of the input matrix"); + Map<String, List<String>> parsedArgs = parseArguments(strings); + if (parsedArgs == null) { + return -1; + } + + int numRows = Integer.parseInt(getOption("numRows")); + int numCols = Integer.parseInt(getOption("numCols")); + + DistributedRowMatrix matrix = new DistributedRowMatrix(getInputPath(), getTempPath(), numRows, numCols); + matrix.setConf(new Configuration(getConf())); + matrix.transpose(); + + return 0; + } + + public static Job buildTransposeJob(Path matrixInputPath, Path matrixOutputPath, int numInputRows) + throws IOException { + return buildTransposeJob(new Configuration(), matrixInputPath, matrixOutputPath, numInputRows); + } + + public static Job buildTransposeJob(Configuration initialConf, Path matrixInputPath, Path matrixOutputPath, + int numInputRows) throws IOException { + + Job job = HadoopUtil.prepareJob(matrixInputPath, matrixOutputPath, SequenceFileInputFormat.class, + TransposeMapper.class, IntWritable.class, VectorWritable.class, MergeVectorsReducer.class, IntWritable.class, + VectorWritable.class, SequenceFileOutputFormat.class, initialConf); + job.setCombinerClass(MergeVectorsCombiner.class); + job.getConfiguration().setInt(TransposeMapper.NEW_NUM_COLS_PARAM, numInputRows); + + job.setJobName("TransposeJob: " + matrixInputPath); + + return job; + } + + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/DistributedLanczosSolver.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/DistributedLanczosSolver.java b/mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/DistributedLanczosSolver.java new file mode 100644 index 0000000..89dddcc --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/DistributedLanczosSolver.java @@ -0,0 +1,298 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.hadoop.decomposer; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import com.google.common.io.Closeables; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.mahout.common.AbstractJob; +import org.apache.mahout.math.DenseVector; +import org.apache.mahout.math.NamedVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorIterable; +import org.apache.mahout.math.VectorWritable; +import org.apache.mahout.math.decomposer.lanczos.LanczosSolver; +import org.apache.mahout.math.decomposer.lanczos.LanczosState; +import org.apache.mahout.math.hadoop.DistributedRowMatrix; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * See the SSVD code for a better option than using this: + * + * http://mahout.apache.org/users/dim-reduction/ssvd.html + * @see org.apache.mahout.math.hadoop.stochasticsvd.SSVDSolver + */ +public class DistributedLanczosSolver extends LanczosSolver implements Tool { + + public static final String RAW_EIGENVECTORS = "rawEigenvectors"; + + private static final Logger log = LoggerFactory.getLogger(DistributedLanczosSolver.class); + + private Configuration conf; + + private Map<String, List<String>> parsedArgs; + + /** + * For the distributed case, the best guess at a useful initialization state for Lanczos we'll chose to be + * uniform over all input dimensions, L_2 normalized. + */ + public static Vector getInitialVector(VectorIterable corpus) { + Vector initialVector = new DenseVector(corpus.numCols()); + initialVector.assign(1.0 / Math.sqrt(corpus.numCols())); + return initialVector; + } + + public LanczosState runJob(Configuration originalConfig, + LanczosState state, + int desiredRank, + boolean isSymmetric, + String outputEigenVectorPathString) throws IOException { + ((Configurable) state.getCorpus()).setConf(new Configuration(originalConfig)); + setConf(originalConfig); + solve(state, desiredRank, isSymmetric); + serializeOutput(state, new Path(outputEigenVectorPathString)); + return state; + } + + /** + * Factored-out LanczosSolver for the purpose of invoking it programmatically + */ + public LanczosState runJob(Configuration originalConfig, + Path inputPath, + Path outputTmpPath, + int numRows, + int numCols, + boolean isSymmetric, + int desiredRank, + String outputEigenVectorPathString) throws IOException { + DistributedRowMatrix matrix = new DistributedRowMatrix(inputPath, outputTmpPath, numRows, numCols); + matrix.setConf(new Configuration(originalConfig)); + LanczosState state = new LanczosState(matrix, desiredRank, getInitialVector(matrix)); + return runJob(originalConfig, state, desiredRank, isSymmetric, outputEigenVectorPathString); + } + + @Override + public int run(String[] strings) throws Exception { + Path inputPath = new Path(AbstractJob.getOption(parsedArgs, "--input")); + Path outputPath = new Path(AbstractJob.getOption(parsedArgs, "--output")); + Path outputTmpPath = new Path(AbstractJob.getOption(parsedArgs, "--tempDir")); + Path workingDirPath = AbstractJob.getOption(parsedArgs, "--workingDir") != null + ? new Path(AbstractJob.getOption(parsedArgs, "--workingDir")) : null; + int numRows = Integer.parseInt(AbstractJob.getOption(parsedArgs, "--numRows")); + int numCols = Integer.parseInt(AbstractJob.getOption(parsedArgs, "--numCols")); + boolean isSymmetric = Boolean.parseBoolean(AbstractJob.getOption(parsedArgs, "--symmetric")); + int desiredRank = Integer.parseInt(AbstractJob.getOption(parsedArgs, "--rank")); + + boolean cleansvd = Boolean.parseBoolean(AbstractJob.getOption(parsedArgs, "--cleansvd")); + if (cleansvd) { + double maxError = Double.parseDouble(AbstractJob.getOption(parsedArgs, "--maxError")); + double minEigenvalue = Double.parseDouble(AbstractJob.getOption(parsedArgs, "--minEigenvalue")); + boolean inMemory = Boolean.parseBoolean(AbstractJob.getOption(parsedArgs, "--inMemory")); + return run(inputPath, + outputPath, + outputTmpPath, + workingDirPath, + numRows, + numCols, + isSymmetric, + desiredRank, + maxError, + minEigenvalue, + inMemory); + } + return run(inputPath, outputPath, outputTmpPath, workingDirPath, numRows, numCols, isSymmetric, desiredRank); + } + + /** + * Run the solver to produce raw eigenvectors, then run the EigenVerificationJob to clean them + * + * @param inputPath the Path to the input corpus + * @param outputPath the Path to the output + * @param outputTmpPath a Path to a temporary working directory + * @param numRows the int number of rows + * @param numCols the int number of columns + * @param isSymmetric true if the input matrix is symmetric + * @param desiredRank the int desired rank of eigenvectors to produce + * @param maxError the maximum allowable error + * @param minEigenvalue the minimum usable eigenvalue + * @param inMemory true if the verification can be done in memory + * @return an int indicating success (0) or otherwise + */ + public int run(Path inputPath, + Path outputPath, + Path outputTmpPath, + Path workingDirPath, + int numRows, + int numCols, + boolean isSymmetric, + int desiredRank, + double maxError, + double minEigenvalue, + boolean inMemory) throws Exception { + int result = run(inputPath, outputPath, outputTmpPath, workingDirPath, numRows, numCols, + isSymmetric, desiredRank); + if (result != 0) { + return result; + } + Path rawEigenVectorPath = new Path(outputPath, RAW_EIGENVECTORS); + return new EigenVerificationJob().run(inputPath, + rawEigenVectorPath, + outputPath, + outputTmpPath, + maxError, + minEigenvalue, + inMemory, + getConf() != null ? new Configuration(getConf()) : new Configuration()); + } + + /** + * Run the solver to produce the raw eigenvectors + * + * @param inputPath the Path to the input corpus + * @param outputPath the Path to the output + * @param outputTmpPath a Path to a temporary working directory + * @param numRows the int number of rows + * @param numCols the int number of columns + * @param isSymmetric true if the input matrix is symmetric + * @param desiredRank the int desired rank of eigenvectors to produce + * @return an int indicating success (0) or otherwise + */ + public int run(Path inputPath, + Path outputPath, + Path outputTmpPath, + Path workingDirPath, + int numRows, + int numCols, + boolean isSymmetric, + int desiredRank) throws Exception { + DistributedRowMatrix matrix = new DistributedRowMatrix(inputPath, outputTmpPath, numRows, numCols); + matrix.setConf(new Configuration(getConf() != null ? getConf() : new Configuration())); + + LanczosState state; + if (workingDirPath == null) { + state = new LanczosState(matrix, desiredRank, getInitialVector(matrix)); + } else { + HdfsBackedLanczosState hState = + new HdfsBackedLanczosState(matrix, desiredRank, getInitialVector(matrix), workingDirPath); + hState.setConf(matrix.getConf()); + state = hState; + } + solve(state, desiredRank, isSymmetric); + + Path outputEigenVectorPath = new Path(outputPath, RAW_EIGENVECTORS); + serializeOutput(state, outputEigenVectorPath); + return 0; + } + + /** + * @param state The final LanczosState to be serialized + * @param outputPath The path (relative to the current Configuration's FileSystem) to save the output to. + */ + public void serializeOutput(LanczosState state, Path outputPath) throws IOException { + int numEigenVectors = state.getIterationNumber(); + log.info("Persisting {} eigenVectors and eigenValues to: {}", numEigenVectors, outputPath); + Configuration conf = getConf() != null ? getConf() : new Configuration(); + FileSystem fs = FileSystem.get(outputPath.toUri(), conf); + SequenceFile.Writer seqWriter = + new SequenceFile.Writer(fs, conf, outputPath, IntWritable.class, VectorWritable.class); + try { + IntWritable iw = new IntWritable(); + for (int i = 0; i < numEigenVectors; i++) { + // Persist eigenvectors sorted by eigenvalues in descending order\ + NamedVector v = new NamedVector(state.getRightSingularVector(numEigenVectors - 1 - i), + "eigenVector" + i + ", eigenvalue = " + state.getSingularValue(numEigenVectors - 1 - i)); + Writable vw = new VectorWritable(v); + iw.set(i); + seqWriter.append(iw, vw); + } + } finally { + Closeables.close(seqWriter, false); + } + } + + @Override + public void setConf(Configuration configuration) { + conf = configuration; + } + + @Override + public Configuration getConf() { + return conf; + } + + public DistributedLanczosSolverJob job() { + return new DistributedLanczosSolverJob(); + } + + /** + * Inner subclass of AbstractJob so we get access to AbstractJob's functionality w.r.t. cmdline options, but still + * sublcass LanczosSolver. + */ + public class DistributedLanczosSolverJob extends AbstractJob { + @Override + public void setConf(Configuration conf) { + DistributedLanczosSolver.this.setConf(conf); + } + + @Override + public Configuration getConf() { + return DistributedLanczosSolver.this.getConf(); + } + + @Override + public int run(String[] args) throws Exception { + addInputOption(); + addOutputOption(); + addOption("numRows", "nr", "Number of rows of the input matrix"); + addOption("numCols", "nc", "Number of columns of the input matrix"); + addOption("rank", "r", "Desired decomposition rank (note: only roughly 1/4 to 1/3 " + + "of these will have the top portion of the spectrum)"); + addOption("symmetric", "sym", "Is the input matrix square and symmetric?"); + addOption("workingDir", "wd", "Working directory path to store Lanczos basis vectors " + + "(to be used on restarts, and to avoid too much RAM usage)"); + // options required to run cleansvd job + addOption("cleansvd", "cl", "Run the EigenVerificationJob to clean the eigenvectors after SVD", false); + addOption("maxError", "err", "Maximum acceptable error", "0.05"); + addOption("minEigenvalue", "mev", "Minimum eigenvalue to keep the vector for", "0.0"); + addOption("inMemory", "mem", "Buffer eigen matrix into memory (if you have enough!)", "false"); + + DistributedLanczosSolver.this.parsedArgs = parseArguments(args); + if (DistributedLanczosSolver.this.parsedArgs == null) { + return -1; + } else { + return DistributedLanczosSolver.this.run(args); + } + } + } + + public static void main(String[] args) throws Exception { + ToolRunner.run(new DistributedLanczosSolver().job(), args); + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVector.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVector.java b/mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVector.java new file mode 100644 index 0000000..d2f0c8c --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVector.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.hadoop.decomposer; + +import org.apache.mahout.math.DenseVector; +import org.apache.mahout.math.NamedVector; +import org.apache.mahout.math.Vector; + +import java.util.regex.Pattern; + +/** + * TODO this is a horrible hack. Make a proper writable subclass also. + */ +public class EigenVector extends NamedVector { + + private static final Pattern EQUAL_PATTERN = Pattern.compile(" = "); + private static final Pattern PIPE_PATTERN = Pattern.compile("\\|"); + + public EigenVector(Vector v, double eigenValue, double cosAngleError, int order) { + super(v instanceof DenseVector ? (DenseVector) v : new DenseVector(v), + "e|" + order + "| = |" + eigenValue + "|, err = " + cosAngleError); + } + + public double getEigenValue() { + return getEigenValue(getName()); + } + + public double getCosAngleError() { + return getCosAngleError(getName()); + } + + public int getIndex() { + return getIndex(getName()); + } + + public static double getEigenValue(CharSequence name) { + return parseMetaData(name)[1]; + } + + public static double getCosAngleError(CharSequence name) { + return parseMetaData(name)[2]; + } + + public static int getIndex(CharSequence name) { + return (int)parseMetaData(name)[0]; + } + + public static double[] parseMetaData(CharSequence name) { + double[] m = new double[3]; + String[] s = EQUAL_PATTERN.split(name); + m[0] = Double.parseDouble(PIPE_PATTERN.split(s[0])[1]); + m[1] = Double.parseDouble(PIPE_PATTERN.split(s[1])[1]); + m[2] = Double.parseDouble(s[2].substring(1)); + return m; + } + + protected double[] parseMetaData() { + return parseMetaData(getName()); + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVerificationJob.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVerificationJob.java b/mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVerificationJob.java new file mode 100644 index 0000000..a7eaaed --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVerificationJob.java @@ -0,0 +1,332 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.hadoop.decomposer; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.io.Closeables; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.util.ToolRunner; +import org.apache.mahout.common.AbstractJob; +import org.apache.mahout.common.commandline.DefaultOptionCreator; +import org.apache.mahout.math.MatrixSlice; +import org.apache.mahout.math.SparseRowMatrix; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorIterable; +import org.apache.mahout.math.VectorWritable; +import org.apache.mahout.math.decomposer.EigenStatus; +import org.apache.mahout.math.decomposer.SimpleEigenVerifier; +import org.apache.mahout.math.decomposer.SingularVectorVerifier; +import org.apache.mahout.math.hadoop.DistributedRowMatrix; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * <p> + * Class for taking the output of an eigendecomposition (specified as a Path location), and verifies correctness, in + * terms of the following: if you have a vector e, and a matrix m, then let e' = m.timesSquared(v); the error w.r.t. + * eigenvector-ness is the cosine of the angle between e and e': + * </p> + * + * <pre> + * error(e,e') = e.dot(e') / (e.norm(2)*e'.norm(2)) + * </pre> + * <p> + * A set of eigenvectors should also all be very close to orthogonal, so this job computes all inner products between + * eigenvectors, and checks that this is close to the identity matrix. + * </p> + * <p> + * Parameters used in the cleanup (other than in the input/output path options) include --minEigenvalue, which specifies + * the value below which eigenvector/eigenvalue pairs will be discarded, and --maxError, which specifies the maximum + * error (as defined above) to be tolerated in an eigenvector. + * </p> + * <p> + * If all the eigenvectors can fit in memory, --inMemory allows for a speedier completion of this task by doing so. + * </p> + */ +public class EigenVerificationJob extends AbstractJob { + + public static final String CLEAN_EIGENVECTORS = "cleanEigenvectors"; + + private static final Logger log = LoggerFactory.getLogger(EigenVerificationJob.class); + + private SingularVectorVerifier eigenVerifier; + + private VectorIterable eigensToVerify; + + private VectorIterable corpus; + + private double maxError; + + private double minEigenValue; + + // private boolean loadEigensInMemory; + + private Path tmpOut; + + private Path outPath; + + private int maxEigensToKeep; + + private Path cleanedEigensPath; + + public void setEigensToVerify(VectorIterable eigens) { + eigensToVerify = eigens; + } + + @Override + public int run(String[] args) throws Exception { + Map<String,List<String>> argMap = handleArgs(args); + if (argMap == null) { + return -1; + } + if (argMap.isEmpty()) { + return 0; + } + // parse out the arguments + runJob(getConf(), new Path(getOption("eigenInput")), new Path(getOption("corpusInput")), getOutputPath(), + getOption("inMemory") != null, Double.parseDouble(getOption("maxError")), + // Double.parseDouble(getOption("minEigenvalue")), + Integer.parseInt(getOption("maxEigens"))); + return 0; + } + + /** + * Run the job with the given arguments + * + * @param corpusInput + * the corpus input Path + * @param eigenInput + * the eigenvector input Path + * @param output + * the output Path + * @param tempOut + * temporary output Path + * @param maxError + * a double representing the maximum error + * @param minEigenValue + * a double representing the minimum eigenvalue + * @param inMemory + * a boolean requesting in-memory preparation + * @param conf + * the Configuration to use, or null if a default is ok (saves referencing Configuration in calling classes + * unless needed) + */ + public int run(Path corpusInput, Path eigenInput, Path output, Path tempOut, double maxError, double minEigenValue, + boolean inMemory, Configuration conf) throws IOException { + this.outPath = output; + this.tmpOut = tempOut; + this.maxError = maxError; + this.minEigenValue = minEigenValue; + + if (eigenInput != null && eigensToVerify == null) { + prepareEigens(conf, eigenInput, inMemory); + } + DistributedRowMatrix c = new DistributedRowMatrix(corpusInput, tempOut, 1, 1); + c.setConf(conf); + corpus = c; + + // set up eigenverifier and orthoverifier TODO: allow multithreaded execution + + eigenVerifier = new SimpleEigenVerifier(); + + // we don't currently verify orthonormality here. + // VectorIterable pairwiseInnerProducts = computePairwiseInnerProducts(); + + Map<MatrixSlice,EigenStatus> eigenMetaData = verifyEigens(); + + List<Map.Entry<MatrixSlice,EigenStatus>> prunedEigenMeta = pruneEigens(eigenMetaData); + + saveCleanEigens(new Configuration(), prunedEigenMeta); + return 0; + } + + private Map<String,List<String>> handleArgs(String[] args) throws IOException { + addOutputOption(); + addOption("eigenInput", "ei", + "The Path for purported eigenVector input files (SequenceFile<WritableComparable,VectorWritable>.", null); + addOption("corpusInput", "ci", "The Path for corpus input files (SequenceFile<WritableComparable,VectorWritable>."); + addOption(DefaultOptionCreator.outputOption().create()); + addOption(DefaultOptionCreator.helpOption()); + addOption("inMemory", "mem", "Buffer eigen matrix into memory (if you have enough!)", "false"); + addOption("maxError", "err", "Maximum acceptable error", "0.05"); + addOption("minEigenvalue", "mev", "Minimum eigenvalue to keep the vector for", "0.0"); + addOption("maxEigens", "max", "Maximum number of eigenvectors to keep (0 means all)", "0"); + + return parseArguments(args); + } + + private void saveCleanEigens(Configuration conf, Collection<Map.Entry<MatrixSlice,EigenStatus>> prunedEigenMeta) + throws IOException { + Path path = new Path(outPath, CLEAN_EIGENVECTORS); + FileSystem fs = FileSystem.get(path.toUri(), conf); + SequenceFile.Writer seqWriter = new SequenceFile.Writer(fs, conf, path, IntWritable.class, VectorWritable.class); + try { + IntWritable iw = new IntWritable(); + int numEigensWritten = 0; + int index = 0; + for (Map.Entry<MatrixSlice,EigenStatus> pruneSlice : prunedEigenMeta) { + MatrixSlice s = pruneSlice.getKey(); + EigenStatus meta = pruneSlice.getValue(); + EigenVector ev = new EigenVector(s.vector(), meta.getEigenValue(), Math.abs(1 - meta.getCosAngle()), s.index()); + // log.info("appending {} to {}", ev, path); + Writable vw = new VectorWritable(ev); + iw.set(index++); + seqWriter.append(iw, vw); + + // increment the number of eigenvectors written and see if we've + // reached our specified limit, or if we wish to write all eigenvectors + // (latter is built-in, since numEigensWritten will always be > 0 + numEigensWritten++; + if (numEigensWritten == maxEigensToKeep) { + log.info("{} of the {} total eigens have been written", maxEigensToKeep, prunedEigenMeta.size()); + break; + } + } + } finally { + Closeables.close(seqWriter, false); + } + cleanedEigensPath = path; + } + + private List<Map.Entry<MatrixSlice,EigenStatus>> pruneEigens(Map<MatrixSlice,EigenStatus> eigenMetaData) { + List<Map.Entry<MatrixSlice,EigenStatus>> prunedEigenMeta = Lists.newArrayList(); + + for (Map.Entry<MatrixSlice,EigenStatus> entry : eigenMetaData.entrySet()) { + if (Math.abs(1 - entry.getValue().getCosAngle()) < maxError && entry.getValue().getEigenValue() > minEigenValue) { + prunedEigenMeta.add(entry); + } + } + + Collections.sort(prunedEigenMeta, new Comparator<Map.Entry<MatrixSlice,EigenStatus>>() { + @Override + public int compare(Map.Entry<MatrixSlice,EigenStatus> e1, Map.Entry<MatrixSlice,EigenStatus> e2) { + // sort eigens on eigenvalues in descending order + Double eg1 = e1.getValue().getEigenValue(); + Double eg2 = e2.getValue().getEigenValue(); + return eg1.compareTo(eg2); + } + }); + + // iterate thru' the eigens, pick up ones with max orthogonality with the selected ones + List<Map.Entry<MatrixSlice,EigenStatus>> selectedEigenMeta = Lists.newArrayList(); + Map.Entry<MatrixSlice,EigenStatus> e1 = prunedEigenMeta.remove(0); + selectedEigenMeta.add(e1); + int selectedEigenMetaLength = selectedEigenMeta.size(); + int prunedEigenMetaLength = prunedEigenMeta.size(); + + while (prunedEigenMetaLength > 0) { + double sum = Double.MAX_VALUE; + int index = 0; + for (int i = 0; i < prunedEigenMetaLength; i++) { + Map.Entry<MatrixSlice,EigenStatus> e = prunedEigenMeta.get(i); + double tmp = 0; + for (int j = 0; j < selectedEigenMetaLength; j++) { + Map.Entry<MatrixSlice,EigenStatus> ee = selectedEigenMeta.get(j); + tmp += ee.getKey().vector().times(e.getKey().vector()).norm(2); + } + if (tmp < sum) { + sum = tmp; + index = i; + } + } + Map.Entry<MatrixSlice,EigenStatus> e = prunedEigenMeta.remove(index); + selectedEigenMeta.add(e); + selectedEigenMetaLength++; + prunedEigenMetaLength--; + } + + return selectedEigenMeta; + } + + private Map<MatrixSlice,EigenStatus> verifyEigens() { + Map<MatrixSlice,EigenStatus> eigenMetaData = Maps.newHashMap(); + + for (MatrixSlice slice : eigensToVerify) { + EigenStatus status = eigenVerifier.verify(corpus, slice.vector()); + eigenMetaData.put(slice, status); + } + return eigenMetaData; + } + + private void prepareEigens(Configuration conf, Path eigenInput, boolean inMemory) { + DistributedRowMatrix eigens = new DistributedRowMatrix(eigenInput, tmpOut, 1, 1); + eigens.setConf(conf); + if (inMemory) { + List<Vector> eigenVectors = Lists.newArrayList(); + for (MatrixSlice slice : eigens) { + eigenVectors.add(slice.vector()); + } + eigensToVerify = new SparseRowMatrix(eigenVectors.size(), eigenVectors.get(0).size(), + eigenVectors.toArray(new Vector[eigenVectors.size()]), true, true); + + } else { + eigensToVerify = eigens; + } + } + + public Path getCleanedEigensPath() { + return cleanedEigensPath; + } + + public static void main(String[] args) throws Exception { + ToolRunner.run(new EigenVerificationJob(), args); + } + + /** + * Progammatic invocation of run() + * + * @param eigenInput + * Output of LanczosSolver + * @param corpusInput + * Input of LanczosSolver + */ + public void runJob(Configuration conf, Path eigenInput, Path corpusInput, Path output, boolean inMemory, + double maxError, int maxEigens) throws IOException { + // no need to handle command line arguments + outPath = output; + tmpOut = new Path(outPath, "tmp"); + maxEigensToKeep = maxEigens; + this.maxError = maxError; + if (eigenInput != null && eigensToVerify == null) { + prepareEigens(new Configuration(conf), eigenInput, inMemory); + } + + DistributedRowMatrix c = new DistributedRowMatrix(corpusInput, tmpOut, 1, 1); + c.setConf(new Configuration(conf)); + corpus = c; + + eigenVerifier = new SimpleEigenVerifier(); + + Map<MatrixSlice,EigenStatus> eigenMetaData = verifyEigens(); + List<Map.Entry<MatrixSlice,EigenStatus>> prunedEigenMeta = pruneEigens(eigenMetaData); + saveCleanEigens(conf, prunedEigenMeta); + } +}
