http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/decomposer/hebbian/HebbianUpdater.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/math/decomposer/hebbian/HebbianUpdater.java b/community/mahout-mr/src/main/java/org/apache/mahout/math/decomposer/hebbian/HebbianUpdater.java new file mode 100644 index 0000000..2080c3a --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/math/decomposer/hebbian/HebbianUpdater.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.decomposer.hebbian; + + +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.function.PlusMult; + +public class HebbianUpdater implements EigenUpdater { + + @Override + public void update(Vector pseudoEigen, + Vector trainingVector, + TrainingState currentState) { + double trainingVectorNorm = trainingVector.norm(2); + int numPreviousEigens = currentState.getNumEigensProcessed(); + if (numPreviousEigens > 0 && currentState.isFirstPass()) { + updateTrainingProjectionsVector(currentState, trainingVector, numPreviousEigens - 1); + } + if (currentState.getActivationDenominatorSquared() == 0 || trainingVectorNorm == 0) { + if (currentState.getActivationDenominatorSquared() == 0) { + pseudoEigen.assign(trainingVector, new PlusMult(1)); + currentState.setHelperVector(currentState.currentTrainingProjection().clone()); + double helperNorm = currentState.getHelperVector().norm(2); + currentState.setActivationDenominatorSquared(trainingVectorNorm * trainingVectorNorm - helperNorm * helperNorm); + } + return; + } + currentState.setActivationNumerator(pseudoEigen.dot(trainingVector)); + currentState.setActivationNumerator( + currentState.getActivationNumerator() + - currentState.getHelperVector().dot(currentState.currentTrainingProjection())); + + double activation = currentState.getActivationNumerator() + / Math.sqrt(currentState.getActivationDenominatorSquared()); + currentState.setActivationDenominatorSquared( + currentState.getActivationDenominatorSquared() + + 2 * activation * currentState.getActivationNumerator() + + activation * activation + * (trainingVector.getLengthSquared() - currentState.currentTrainingProjection().getLengthSquared())); + if (numPreviousEigens > 0) { + currentState.getHelperVector().assign(currentState.currentTrainingProjection(), new PlusMult(activation)); + } + pseudoEigen.assign(trainingVector, new PlusMult(activation)); + } + + private static void updateTrainingProjectionsVector(TrainingState state, + Vector trainingVector, + int previousEigenIndex) { + Vector previousEigen = state.mostRecentEigen(); + Vector currentTrainingVectorProjection = state.currentTrainingProjection(); + double projection = previousEigen.dot(trainingVector); + currentTrainingVectorProjection.set(previousEigenIndex, projection); + } + +}
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/decomposer/hebbian/TrainingState.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/math/decomposer/hebbian/TrainingState.java b/community/mahout-mr/src/main/java/org/apache/mahout/math/decomposer/hebbian/TrainingState.java new file mode 100644 index 0000000..af6c2ef --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/math/decomposer/hebbian/TrainingState.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.decomposer.hebbian; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.mahout.math.DenseVector; +import org.apache.mahout.math.Matrix; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.decomposer.EigenStatus; + +public class TrainingState { + + private Matrix currentEigens; + private int numEigensProcessed; + private List<Double> currentEigenValues; + private Matrix trainingProjections; + private int trainingIndex; + private Vector helperVector; + private boolean firstPass; + private List<EigenStatus> statusProgress; + private double activationNumerator; + private double activationDenominatorSquared; + + TrainingState(Matrix eigens, Matrix projections) { + currentEigens = eigens; + trainingProjections = projections; + trainingIndex = 0; + helperVector = new DenseVector(eigens.numRows()); + firstPass = true; + statusProgress = new ArrayList<>(); + activationNumerator = 0; + activationDenominatorSquared = 0; + numEigensProcessed = 0; + } + + public Vector mostRecentEigen() { + return currentEigens.viewRow(numEigensProcessed - 1); + } + + public Vector currentTrainingProjection() { + if (trainingProjections.viewRow(trainingIndex) == null) { + trainingProjections.assignRow(trainingIndex, new DenseVector(currentEigens.numCols())); + } + return trainingProjections.viewRow(trainingIndex); + } + + public Matrix getCurrentEigens() { + return currentEigens; + } + + public void setCurrentEigens(Matrix currentEigens) { + this.currentEigens = currentEigens; + } + + public int getNumEigensProcessed() { + return numEigensProcessed; + } + + public void setNumEigensProcessed(int numEigensProcessed) { + this.numEigensProcessed = numEigensProcessed; + } + + public List<Double> getCurrentEigenValues() { + return currentEigenValues; + } + + public void setCurrentEigenValues(List<Double> currentEigenValues) { + this.currentEigenValues = currentEigenValues; + } + + public Matrix getTrainingProjections() { + return trainingProjections; + } + + public void setTrainingProjections(Matrix trainingProjections) { + this.trainingProjections = trainingProjections; + } + + public int getTrainingIndex() { + return trainingIndex; + } + + public void setTrainingIndex(int trainingIndex) { + this.trainingIndex = trainingIndex; + } + + public Vector getHelperVector() { + return helperVector; + } + + public void setHelperVector(Vector helperVector) { + this.helperVector = helperVector; + } + + public boolean isFirstPass() { + return firstPass; + } + + public void setFirstPass(boolean firstPass) { + this.firstPass = firstPass; + } + + public List<EigenStatus> getStatusProgress() { + return statusProgress; + } + + public void setStatusProgress(List<EigenStatus> statusProgress) { + this.statusProgress = statusProgress; + } + + public double getActivationNumerator() { + return activationNumerator; + } + + public void setActivationNumerator(double activationNumerator) { + this.activationNumerator = activationNumerator; + } + + public double getActivationDenominatorSquared() { + return activationDenominatorSquared; + } + + public void setActivationDenominatorSquared(double activationDenominatorSquared) { + this.activationDenominatorSquared = activationDenominatorSquared; + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/decomposer/lanczos/LanczosSolver.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/math/decomposer/lanczos/LanczosSolver.java b/community/mahout-mr/src/main/java/org/apache/mahout/math/decomposer/lanczos/LanczosSolver.java new file mode 100644 index 0000000..61a77db --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/math/decomposer/lanczos/LanczosSolver.java @@ -0,0 +1,213 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.decomposer.lanczos; + + +import java.util.EnumMap; +import java.util.Map; + +import com.google.common.base.Preconditions; +import org.apache.mahout.math.Matrix; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorIterable; +import org.apache.mahout.math.function.DoubleFunction; +import org.apache.mahout.math.function.PlusMult; +import org.apache.mahout.math.solver.EigenDecomposition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Simple implementation of the <a href="http://en.wikipedia.org/wiki/Lanczos_algorithm">Lanczos algorithm</a> for + * finding eigenvalues of a symmetric matrix, applied to non-symmetric matrices by applying Matrix.timesSquared(vector) + * as the "matrix-multiplication" method.<p> + * + * See the SSVD code for a better option + * {@link org.apache.mahout.math.ssvd.SequentialBigSvd} + * See also the docs on + * <a href=https://mahout.apache.org/users/dim-reduction/ssvd.html>stochastic + * projection SVD</a> + * <p> + * To avoid floating point overflow problems which arise in power-methods like Lanczos, an initial pass is made + * through the input matrix to + * <ul> + * <li>generate a good starting seed vector by summing all the rows of the input matrix, and</li> + * <li>compute the trace(inputMatrix<sup>t</sup>*matrix) + * </ul> + * <p> + * This latter value, being the sum of all of the singular values, is used to rescale the entire matrix, effectively + * forcing the largest singular value to be strictly less than one, and transforming floating point <em>overflow</em> + * problems into floating point <em>underflow</em> (ie, very small singular values will become invisible, as they + * will appear to be zero and the algorithm will terminate). + * <p>This implementation uses {@link EigenDecomposition} to do the + * eigenvalue extraction from the small (desiredRank x desiredRank) tridiagonal matrix. Numerical stability is + * achieved via brute-force: re-orthogonalization against all previous eigenvectors is computed after every pass. + * This can be made smarter if (when!) this proves to be a major bottleneck. Of course, this step can be parallelized + * as well. + * @see org.apache.mahout.math.ssvd.SequentialBigSvd + */ +@Deprecated +public class LanczosSolver { + + private static final Logger log = LoggerFactory.getLogger(LanczosSolver.class); + + public static final double SAFE_MAX = 1.0e150; + + public enum TimingSection { + ITERATE, ORTHOGANLIZE, TRIDIAG_DECOMP, FINAL_EIGEN_CREATE + } + + private final Map<TimingSection, Long> startTimes = new EnumMap<>(TimingSection.class); + private final Map<TimingSection, Long> times = new EnumMap<>(TimingSection.class); + + private static final class Scale extends DoubleFunction { + private final double d; + + private Scale(double d) { + this.d = d; + } + + @Override + public double apply(double arg1) { + return arg1 * d; + } + } + + public void solve(LanczosState state, + int desiredRank) { + solve(state, desiredRank, false); + } + + public void solve(LanczosState state, + int desiredRank, + boolean isSymmetric) { + VectorIterable corpus = state.getCorpus(); + log.info("Finding {} singular vectors of matrix with {} rows, via Lanczos", + desiredRank, corpus.numRows()); + int i = state.getIterationNumber(); + Vector currentVector = state.getBasisVector(i - 1); + Vector previousVector = state.getBasisVector(i - 2); + double beta = 0; + Matrix triDiag = state.getDiagonalMatrix(); + while (i < desiredRank) { + startTime(TimingSection.ITERATE); + Vector nextVector = isSymmetric ? corpus.times(currentVector) : corpus.timesSquared(currentVector); + log.info("{} passes through the corpus so far...", i); + if (state.getScaleFactor() <= 0) { + state.setScaleFactor(calculateScaleFactor(nextVector)); + } + nextVector.assign(new Scale(1.0 / state.getScaleFactor())); + if (previousVector != null) { + nextVector.assign(previousVector, new PlusMult(-beta)); + } + // now orthogonalize + double alpha = currentVector.dot(nextVector); + nextVector.assign(currentVector, new PlusMult(-alpha)); + endTime(TimingSection.ITERATE); + startTime(TimingSection.ORTHOGANLIZE); + orthoganalizeAgainstAllButLast(nextVector, state); + endTime(TimingSection.ORTHOGANLIZE); + // and normalize + beta = nextVector.norm(2); + if (outOfRange(beta) || outOfRange(alpha)) { + log.warn("Lanczos parameters out of range: alpha = {}, beta = {}. Bailing out early!", + alpha, beta); + break; + } + nextVector.assign(new Scale(1 / beta)); + state.setBasisVector(i, nextVector); + previousVector = currentVector; + currentVector = nextVector; + // save the projections and norms! + triDiag.set(i - 1, i - 1, alpha); + if (i < desiredRank - 1) { + triDiag.set(i - 1, i, beta); + triDiag.set(i, i - 1, beta); + } + state.setIterationNumber(++i); + } + startTime(TimingSection.TRIDIAG_DECOMP); + + log.info("Lanczos iteration complete - now to diagonalize the tri-diagonal auxiliary matrix."); + // at this point, have tridiag all filled out, and basis is all filled out, and orthonormalized + EigenDecomposition decomp = new EigenDecomposition(triDiag); + + Matrix eigenVects = decomp.getV(); + Vector eigenVals = decomp.getRealEigenvalues(); + endTime(TimingSection.TRIDIAG_DECOMP); + startTime(TimingSection.FINAL_EIGEN_CREATE); + for (int row = 0; row < i; row++) { + Vector realEigen = null; + + Vector ejCol = eigenVects.viewColumn(row); + int size = Math.min(ejCol.size(), state.getBasisSize()); + for (int j = 0; j < size; j++) { + double d = ejCol.get(j); + Vector rowJ = state.getBasisVector(j); + if (realEigen == null) { + realEigen = rowJ.like(); + } + realEigen.assign(rowJ, new PlusMult(d)); + } + + Preconditions.checkState(realEigen != null); + assert realEigen != null; + + realEigen = realEigen.normalize(); + state.setRightSingularVector(row, realEigen); + double e = eigenVals.get(row) * state.getScaleFactor(); + if (!isSymmetric) { + e = Math.sqrt(e); + } + log.info("Eigenvector {} found with eigenvalue {}", row, e); + state.setSingularValue(row, e); + } + log.info("LanczosSolver finished."); + endTime(TimingSection.FINAL_EIGEN_CREATE); + } + + protected static double calculateScaleFactor(Vector nextVector) { + return nextVector.norm(2); + } + + private static boolean outOfRange(double d) { + return Double.isNaN(d) || d > SAFE_MAX || -d > SAFE_MAX; + } + + protected static void orthoganalizeAgainstAllButLast(Vector nextVector, LanczosState state) { + for (int i = 0; i < state.getIterationNumber(); i++) { + Vector basisVector = state.getBasisVector(i); + double alpha; + if (basisVector == null || (alpha = nextVector.dot(basisVector)) == 0.0) { + continue; + } + nextVector.assign(basisVector, new PlusMult(-alpha)); + } + } + + private void startTime(TimingSection section) { + startTimes.put(section, System.nanoTime()); + } + + private void endTime(TimingSection section) { + if (!times.containsKey(section)) { + times.put(section, 0L); + } + times.put(section, times.get(section) + System.nanoTime() - startTimes.get(section)); + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/decomposer/lanczos/LanczosState.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/math/decomposer/lanczos/LanczosState.java b/community/mahout-mr/src/main/java/org/apache/mahout/math/decomposer/lanczos/LanczosState.java new file mode 100644 index 0000000..2ba34bd --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/math/decomposer/lanczos/LanczosState.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.decomposer.lanczos; + +import com.google.common.collect.Maps; +import org.apache.mahout.math.DenseMatrix; +import org.apache.mahout.math.Matrix; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorIterable; + +import java.util.Map; + +@Deprecated +public class LanczosState { + + protected Matrix diagonalMatrix; + protected final VectorIterable corpus; + protected double scaleFactor; + protected int iterationNumber; + protected final int desiredRank; + protected Map<Integer, Vector> basis; + protected final Map<Integer, Double> singularValues; + protected Map<Integer, Vector> singularVectors; + + public LanczosState(VectorIterable corpus, int desiredRank, Vector initialVector) { + this.corpus = corpus; + this.desiredRank = desiredRank; + intitializeBasisAndSingularVectors(); + setBasisVector(0, initialVector); + scaleFactor = 0; + diagonalMatrix = new DenseMatrix(desiredRank, desiredRank); + singularValues = Maps.newHashMap(); + iterationNumber = 1; + } + + private void intitializeBasisAndSingularVectors() { + basis = Maps.newHashMap(); + singularVectors = Maps.newHashMap(); + } + + public Matrix getDiagonalMatrix() { + return diagonalMatrix; + } + + public int getIterationNumber() { + return iterationNumber; + } + + public double getScaleFactor() { + return scaleFactor; + } + + public VectorIterable getCorpus() { + return corpus; + } + + public Vector getRightSingularVector(int i) { + return singularVectors.get(i); + } + + public Double getSingularValue(int i) { + return singularValues.get(i); + } + + public Vector getBasisVector(int i) { + return basis.get(i); + } + + public int getBasisSize() { + return basis.size(); + } + + public void setBasisVector(int i, Vector basisVector) { + basis.put(i, basisVector); + } + + public void setScaleFactor(double scale) { + scaleFactor = scale; + } + + public void setIterationNumber(int i) { + iterationNumber = i; + } + + public void setRightSingularVector(int i, Vector vector) { + singularVectors.put(i, vector); + } + + public void setSingularValue(int i, double value) { + singularValues.put(i, value); + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java new file mode 100644 index 0000000..de5e216 --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java @@ -0,0 +1,390 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.hadoop; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Iterator; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.Job; +import org.apache.mahout.common.Pair; +import org.apache.mahout.common.iterator.sequencefile.PathFilters; +import org.apache.mahout.common.iterator.sequencefile.PathType; +import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterator; +import org.apache.mahout.math.CardinalityException; +import org.apache.mahout.math.MatrixSlice; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorIterable; +import org.apache.mahout.math.VectorWritable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Function; +import com.google.common.collect.Iterators; + +/** + * DistributedRowMatrix is a FileSystem-backed VectorIterable in which the vectors live in a + * SequenceFile<WritableComparable,VectorWritable>, and distributed operations are executed as M/R passes on + * Hadoop. The usage is as follows: <p> + * <p> + * <pre> + * // the path must already contain an already created SequenceFile! + * DistributedRowMatrix m = new DistributedRowMatrix("path/to/vector/sequenceFile", "tmp/path", 10000000, 250000); + * m.setConf(new Configuration()); + * // now if we want to multiply a vector by this matrix, it's dimension must equal the row dimension of this + * // matrix. If we want to timesSquared() a vector by this matrix, its dimension must equal the column dimension + * // of the matrix. + * Vector v = new DenseVector(250000); + * // now the following operation will be done via a M/R pass via Hadoop. + * Vector w = m.timesSquared(v); + * </pre> + * + */ +public class DistributedRowMatrix implements VectorIterable, Configurable { + public static final String KEEP_TEMP_FILES = "DistributedMatrix.keep.temp.files"; + + private static final Logger log = LoggerFactory.getLogger(DistributedRowMatrix.class); + + private final Path inputPath; + private final Path outputTmpPath; + private Configuration conf; + private Path rowPath; + private Path outputTmpBasePath; + private final int numRows; + private final int numCols; + private boolean keepTempFiles; + + public DistributedRowMatrix(Path inputPath, + Path outputTmpPath, + int numRows, + int numCols) { + this(inputPath, outputTmpPath, numRows, numCols, false); + } + + public DistributedRowMatrix(Path inputPath, + Path outputTmpPath, + int numRows, + int numCols, + boolean keepTempFiles) { + this.inputPath = inputPath; + this.outputTmpPath = outputTmpPath; + this.numRows = numRows; + this.numCols = numCols; + this.keepTempFiles = keepTempFiles; + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + try { + FileSystem fs = FileSystem.get(inputPath.toUri(), conf); + rowPath = fs.makeQualified(inputPath); + outputTmpBasePath = fs.makeQualified(outputTmpPath); + keepTempFiles = conf.getBoolean(KEEP_TEMP_FILES, false); + } catch (IOException ioe) { + throw new IllegalStateException(ioe); + } + } + + public Path getRowPath() { + return rowPath; + } + + public Path getOutputTempPath() { + return outputTmpBasePath; + } + + public void setOutputTempPathString(String outPathString) { + try { + outputTmpBasePath = FileSystem.get(conf).makeQualified(new Path(outPathString)); + } catch (IOException ioe) { + log.warn("Unable to set outputBasePath to {}, leaving as {}", + outPathString, outputTmpBasePath); + } + } + + @Override + public Iterator<MatrixSlice> iterateNonEmpty() { + return iterator(); + } + + @Override + public Iterator<MatrixSlice> iterateAll() { + try { + Path pathPattern = rowPath; + if (FileSystem.get(conf).getFileStatus(rowPath).isDir()) { + pathPattern = new Path(rowPath, "*"); + } + return Iterators.transform( + new SequenceFileDirIterator<IntWritable,VectorWritable>(pathPattern, + PathType.GLOB, + PathFilters.logsCRCFilter(), + null, + true, + conf), + new Function<Pair<IntWritable,VectorWritable>,MatrixSlice>() { + @Override + public MatrixSlice apply(Pair<IntWritable, VectorWritable> from) { + return new MatrixSlice(from.getSecond().get(), from.getFirst().get()); + } + }); + } catch (IOException ioe) { + throw new IllegalStateException(ioe); + } + } + + @Override + public int numSlices() { + return numRows(); + } + + @Override + public int numRows() { + return numRows; + } + + @Override + public int numCols() { + return numCols; + } + + + /** + * This implements matrix this.transpose().times(other) + * @param other a DistributedRowMatrix + * @return a DistributedRowMatrix containing the product + */ + public DistributedRowMatrix times(DistributedRowMatrix other) throws IOException { + return times(other, new Path(outputTmpBasePath.getParent(), "productWith-" + (System.nanoTime() & 0xFF))); + } + + /** + * This implements matrix this.transpose().times(other) + * @param other a DistributedRowMatrix + * @param outPath path to write result to + * @return a DistributedRowMatrix containing the product + */ + public DistributedRowMatrix times(DistributedRowMatrix other, Path outPath) throws IOException { + if (numRows != other.numRows()) { + throw new CardinalityException(numRows, other.numRows()); + } + + Configuration initialConf = getConf() == null ? new Configuration() : getConf(); + Configuration conf = + MatrixMultiplicationJob.createMatrixMultiplyJobConf(initialConf, + rowPath, + other.rowPath, + outPath, + other.numCols); + JobClient.runJob(new JobConf(conf)); + DistributedRowMatrix out = new DistributedRowMatrix(outPath, outputTmpPath, numCols, other.numCols()); + out.setConf(conf); + return out; + } + + public Vector columnMeans() throws IOException { + return columnMeans("SequentialAccessSparseVector"); + } + + /** + * Returns the column-wise mean of a DistributedRowMatrix + * + * @param vectorClass + * desired class for the column-wise mean vector e.g. + * RandomAccessSparseVector, DenseVector + * @return Vector containing the column-wise mean of this + */ + public Vector columnMeans(String vectorClass) throws IOException { + Path outputVectorTmpPath = + new Path(outputTmpBasePath, new Path(Long.toString(System.nanoTime()))); + Configuration initialConf = + getConf() == null ? new Configuration() : getConf(); + String vectorClassFull = "org.apache.mahout.math." + vectorClass; + Vector mean = MatrixColumnMeansJob.run(initialConf, rowPath, outputVectorTmpPath, vectorClassFull); + if (!keepTempFiles) { + FileSystem fs = outputVectorTmpPath.getFileSystem(conf); + fs.delete(outputVectorTmpPath, true); + } + return mean; + } + + public DistributedRowMatrix transpose() throws IOException { + Path outputPath = new Path(rowPath.getParent(), "transpose-" + (System.nanoTime() & 0xFF)); + Configuration initialConf = getConf() == null ? new Configuration() : getConf(); + Job transposeJob = TransposeJob.buildTransposeJob(initialConf, rowPath, outputPath, numRows); + + try { + transposeJob.waitForCompletion(true); + } catch (Exception e) { + throw new IllegalStateException("transposition failed", e); + } + + DistributedRowMatrix m = new DistributedRowMatrix(outputPath, outputTmpPath, numCols, numRows); + m.setConf(this.conf); + return m; + } + + @Override + public Vector times(Vector v) { + try { + Configuration initialConf = getConf() == null ? new Configuration() : getConf(); + Path outputVectorTmpPath = new Path(outputTmpBasePath, new Path(Long.toString(System.nanoTime()))); + + Job job = TimesSquaredJob.createTimesJob(initialConf, v, numRows, rowPath, outputVectorTmpPath); + + try { + job.waitForCompletion(true); + } catch (Exception e) { + throw new IllegalStateException("times failed", e); + } + + Vector result = TimesSquaredJob.retrieveTimesSquaredOutputVector(outputVectorTmpPath, conf); + if (!keepTempFiles) { + FileSystem fs = outputVectorTmpPath.getFileSystem(conf); + fs.delete(outputVectorTmpPath, true); + } + return result; + } catch (IOException ioe) { + throw new IllegalStateException(ioe); + } + } + + @Override + public Vector timesSquared(Vector v) { + try { + Configuration initialConf = getConf() == null ? new Configuration() : getConf(); + Path outputVectorTmpPath = new Path(outputTmpBasePath, new Path(Long.toString(System.nanoTime()))); + + Job job = TimesSquaredJob.createTimesSquaredJob(initialConf, v, rowPath, outputVectorTmpPath); + + try { + job.waitForCompletion(true); + } catch (Exception e) { + throw new IllegalStateException("timesSquared failed", e); + } + + Vector result = TimesSquaredJob.retrieveTimesSquaredOutputVector(outputVectorTmpPath, conf); + if (!keepTempFiles) { + FileSystem fs = outputVectorTmpPath.getFileSystem(conf); + fs.delete(outputVectorTmpPath, true); + } + return result; + } catch (IOException ioe) { + throw new IllegalStateException(ioe); + } + } + + @Override + public Iterator<MatrixSlice> iterator() { + return iterateAll(); + } + + public static class MatrixEntryWritable implements WritableComparable<MatrixEntryWritable> { + private int row; + private int col; + private double val; + + public int getRow() { + return row; + } + + public void setRow(int row) { + this.row = row; + } + + public int getCol() { + return col; + } + + public void setCol(int col) { + this.col = col; + } + + public double getVal() { + return val; + } + + public void setVal(double val) { + this.val = val; + } + + @Override + public int compareTo(MatrixEntryWritable o) { + if (row > o.row) { + return 1; + } else if (row < o.row) { + return -1; + } else { + if (col > o.col) { + return 1; + } else if (col < o.col) { + return -1; + } else { + return 0; + } + } + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof MatrixEntryWritable)) { + return false; + } + MatrixEntryWritable other = (MatrixEntryWritable) o; + return row == other.row && col == other.col; + } + + @Override + public int hashCode() { + return row + 31 * col; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(row); + out.writeInt(col); + out.writeDouble(val); + } + + @Override + public void readFields(DataInput in) throws IOException { + row = in.readInt(); + col = in.readInt(); + val = in.readDouble(); + } + + @Override + public String toString() { + return "(" + row + ',' + col + "):" + val; + } + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/MatrixColumnMeansJob.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/MatrixColumnMeansJob.java b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/MatrixColumnMeansJob.java new file mode 100644 index 0000000..b4f459a --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/MatrixColumnMeansJob.java @@ -0,0 +1,236 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.mahout.math.hadoop; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.mahout.common.ClassUtils; +import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterator; +import org.apache.mahout.math.DenseVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; +import org.apache.mahout.math.function.Functions; + +import com.google.common.io.Closeables; + +/** + * MatrixColumnMeansJob is a job for calculating the column-wise mean of a + * DistributedRowMatrix. This job can be accessed using + * DistributedRowMatrix.columnMeans() + */ +public final class MatrixColumnMeansJob { + + public static final String VECTOR_CLASS = + "DistributedRowMatrix.columnMeans.vector.class"; + + private MatrixColumnMeansJob() { + } + + public static Vector run(Configuration conf, + Path inputPath, + Path outputVectorTmpPath) throws IOException { + return run(conf, inputPath, outputVectorTmpPath, null); + } + + /** + * Job for calculating column-wise mean of a DistributedRowMatrix + * + * @param initialConf + * @param inputPath + * path to DistributedRowMatrix input + * @param outputVectorTmpPath + * path for temporary files created during job + * @param vectorClass + * String of desired class for returned vector e.g. DenseVector, + * RandomAccessSparseVector (may be null for {@link DenseVector} ) + * @return Vector containing column-wise mean of DistributedRowMatrix + */ + public static Vector run(Configuration initialConf, + Path inputPath, + Path outputVectorTmpPath, + String vectorClass) throws IOException { + + try { + initialConf.set(VECTOR_CLASS, + vectorClass == null ? DenseVector.class.getName() + : vectorClass); + + Job job = new Job(initialConf, "MatrixColumnMeansJob"); + job.setJarByClass(MatrixColumnMeansJob.class); + + FileOutputFormat.setOutputPath(job, outputVectorTmpPath); + + outputVectorTmpPath.getFileSystem(job.getConfiguration()) + .delete(outputVectorTmpPath, true); + job.setNumReduceTasks(1); + FileOutputFormat.setOutputPath(job, outputVectorTmpPath); + FileInputFormat.addInputPath(job, inputPath); + job.setInputFormatClass(SequenceFileInputFormat.class); + job.setOutputFormatClass(SequenceFileOutputFormat.class); + FileOutputFormat.setOutputPath(job, outputVectorTmpPath); + + job.setMapperClass(MatrixColumnMeansMapper.class); + job.setReducerClass(MatrixColumnMeansReducer.class); + job.setMapOutputKeyClass(NullWritable.class); + job.setMapOutputValueClass(VectorWritable.class); + job.setOutputKeyClass(IntWritable.class); + job.setOutputValueClass(VectorWritable.class); + job.submit(); + job.waitForCompletion(true); + + Path tmpFile = new Path(outputVectorTmpPath, "part-r-00000"); + SequenceFileValueIterator<VectorWritable> iterator = + new SequenceFileValueIterator<>(tmpFile, true, initialConf); + try { + if (iterator.hasNext()) { + return iterator.next().get(); + } else { + return (Vector) Class.forName(vectorClass).getConstructor(int.class) + .newInstance(0); + } + } finally { + Closeables.close(iterator, true); + } + } catch (IOException ioe) { + throw ioe; + } catch (Throwable thr) { + throw new IOException(thr); + } + } + + /** + * Mapper for calculation of column-wise mean. + */ + public static class MatrixColumnMeansMapper extends + Mapper<Writable, VectorWritable, NullWritable, VectorWritable> { + + private Vector runningSum; + private String vectorClass; + + @Override + public void setup(Context context) { + vectorClass = context.getConfiguration().get(VECTOR_CLASS); + } + + /** + * The mapper computes a running sum of the vectors the task has seen. + * Element 0 of the running sum vector contains a count of the number of + * vectors that have been seen. The remaining elements contain the + * column-wise running sum. Nothing is written at this stage + */ + @Override + public void map(Writable r, VectorWritable v, Context context) + throws IOException { + if (runningSum == null) { + /* + * If this is the first vector the mapper has seen, instantiate a new + * vector using the parameter VECTOR_CLASS + */ + runningSum = ClassUtils.instantiateAs(vectorClass, + Vector.class, + new Class<?>[] { int.class }, + new Object[] { v.get().size() + 1 }); + runningSum.set(0, 1); + runningSum.viewPart(1, v.get().size()).assign(v.get()); + } else { + runningSum.set(0, runningSum.get(0) + 1); + runningSum.viewPart(1, v.get().size()).assign(v.get(), Functions.PLUS); + } + } + + /** + * The column-wise sum is written at the cleanup stage. A single reducer is + * forced so null can be used for the key + */ + @Override + public void cleanup(Context context) throws InterruptedException, + IOException { + if (runningSum != null) { + context.write(NullWritable.get(), new VectorWritable(runningSum)); + } + } + + } + + /** + * The reducer adds the partial column-wise sums from each of the mappers to + * compute the total column-wise sum. The total sum is then divided by the + * total count of vectors to determine the column-wise mean. + */ + public static class MatrixColumnMeansReducer extends + Reducer<NullWritable, VectorWritable, IntWritable, VectorWritable> { + + private static final IntWritable ONE = new IntWritable(1); + + private String vectorClass; + private Vector outputVector; + private final VectorWritable outputVectorWritable = new VectorWritable(); + + @Override + public void setup(Context context) { + vectorClass = context.getConfiguration().get(VECTOR_CLASS); + } + + @Override + public void reduce(NullWritable n, + Iterable<VectorWritable> vectors, + Context context) throws IOException, InterruptedException { + + /** + * Add together partial column-wise sums from mappers + */ + for (VectorWritable v : vectors) { + if (outputVector == null) { + outputVector = v.get(); + } else { + outputVector.assign(v.get(), Functions.PLUS); + } + } + + /** + * Divide total column-wise sum by count of vectors, which corresponds to + * the number of rows in the DistributedRowMatrix + */ + if (outputVector != null) { + outputVectorWritable.set(outputVector.viewPart(1, + outputVector.size() - 1) + .divide(outputVector.get(0))); + context.write(ONE, outputVectorWritable); + } else { + Vector emptyVector = ClassUtils.instantiateAs(vectorClass, + Vector.class, + new Class<?>[] { int.class }, + new Object[] { 0 }); + context.write(ONE, new VectorWritable(emptyVector)); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/MatrixMultiplicationJob.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/MatrixMultiplicationJob.java b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/MatrixMultiplicationJob.java new file mode 100644 index 0000000..48eda08 --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/MatrixMultiplicationJob.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.hadoop; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.SequenceFileInputFormat; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.hadoop.mapred.join.CompositeInputFormat; +import org.apache.hadoop.mapred.join.TupleWritable; +import org.apache.hadoop.util.ToolRunner; +import org.apache.mahout.common.AbstractJob; +import org.apache.mahout.math.RandomAccessSparseVector; +import org.apache.mahout.math.SequentialAccessSparseVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; +import org.apache.mahout.math.function.Functions; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * This still uses the old MR api and as with all things in Mahout that are MapReduce is now part of 'mahout-mr'. + * There is no plan to convert the old MR api used here to the new MR api. + * This will be replaced by the new Spark based Linear Algebra bindings. + */ + +public class MatrixMultiplicationJob extends AbstractJob { + + private static final String OUT_CARD = "output.vector.cardinality"; + + public static Configuration createMatrixMultiplyJobConf(Path aPath, + Path bPath, + Path outPath, + int outCardinality) { + return createMatrixMultiplyJobConf(new Configuration(), aPath, bPath, outPath, outCardinality); + } + + public static Configuration createMatrixMultiplyJobConf(Configuration initialConf, + Path aPath, + Path bPath, + Path outPath, + int outCardinality) { + JobConf conf = new JobConf(initialConf, MatrixMultiplicationJob.class); + conf.setInputFormat(CompositeInputFormat.class); + conf.set("mapred.join.expr", CompositeInputFormat.compose( + "inner", SequenceFileInputFormat.class, aPath, bPath)); + conf.setInt(OUT_CARD, outCardinality); + conf.setOutputFormat(SequenceFileOutputFormat.class); + FileOutputFormat.setOutputPath(conf, outPath); + conf.setMapperClass(MatrixMultiplyMapper.class); + conf.setCombinerClass(MatrixMultiplicationReducer.class); + conf.setReducerClass(MatrixMultiplicationReducer.class); + conf.setMapOutputKeyClass(IntWritable.class); + conf.setMapOutputValueClass(VectorWritable.class); + conf.setOutputKeyClass(IntWritable.class); + conf.setOutputValueClass(VectorWritable.class); + return conf; + } + + public static void main(String[] args) throws Exception { + ToolRunner.run(new MatrixMultiplicationJob(), args); + } + + @Override + public int run(String[] strings) throws Exception { + addOption("numRowsA", "nra", "Number of rows of the first input matrix", true); + addOption("numColsA", "nca", "Number of columns of the first input matrix", true); + addOption("numRowsB", "nrb", "Number of rows of the second input matrix", true); + + addOption("numColsB", "ncb", "Number of columns of the second input matrix", true); + addOption("inputPathA", "ia", "Path to the first input matrix", true); + addOption("inputPathB", "ib", "Path to the second input matrix", true); + + addOption("outputPath", "op", "Path to the output matrix", false); + + Map<String, List<String>> argMap = parseArguments(strings); + if (argMap == null) { + return -1; + } + + DistributedRowMatrix a = new DistributedRowMatrix(new Path(getOption("inputPathA")), + new Path(getOption("tempDir")), + Integer.parseInt(getOption("numRowsA")), + Integer.parseInt(getOption("numColsA"))); + DistributedRowMatrix b = new DistributedRowMatrix(new Path(getOption("inputPathB")), + new Path(getOption("tempDir")), + Integer.parseInt(getOption("numRowsB")), + Integer.parseInt(getOption("numColsB"))); + + a.setConf(new Configuration(getConf())); + b.setConf(new Configuration(getConf())); + + if (hasOption("outputPath")) { + a.times(b, new Path(getOption("outputPath"))); + } else { + a.times(b); + } + + return 0; + } + + public static class MatrixMultiplyMapper extends MapReduceBase + implements Mapper<IntWritable,TupleWritable,IntWritable,VectorWritable> { + + private int outCardinality; + private final IntWritable row = new IntWritable(); + + @Override + public void configure(JobConf conf) { + outCardinality = conf.getInt(OUT_CARD, Integer.MAX_VALUE); + } + + @Override + public void map(IntWritable index, + TupleWritable v, + OutputCollector<IntWritable,VectorWritable> out, + Reporter reporter) throws IOException { + boolean firstIsOutFrag = ((VectorWritable)v.get(0)).get().size() == outCardinality; + Vector outFrag = firstIsOutFrag ? ((VectorWritable)v.get(0)).get() : ((VectorWritable)v.get(1)).get(); + Vector multiplier = firstIsOutFrag ? ((VectorWritable)v.get(1)).get() : ((VectorWritable)v.get(0)).get(); + + VectorWritable outVector = new VectorWritable(); + for (Vector.Element e : multiplier.nonZeroes()) { + row.set(e.index()); + outVector.set(outFrag.times(e.get())); + out.collect(row, outVector); + } + } + } + + public static class MatrixMultiplicationReducer extends MapReduceBase + implements Reducer<IntWritable,VectorWritable,IntWritable,VectorWritable> { + + @Override + public void reduce(IntWritable rowNum, + Iterator<VectorWritable> it, + OutputCollector<IntWritable,VectorWritable> out, + Reporter reporter) throws IOException { + if (!it.hasNext()) { + return; + } + Vector accumulator = new RandomAccessSparseVector(it.next().get()); + while (it.hasNext()) { + Vector row = it.next().get(); + accumulator.assign(row, Functions.PLUS); + } + out.collect(rowNum, new VectorWritable(new SequentialAccessSparseVector(accumulator))); + } + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java new file mode 100644 index 0000000..e234eb9 --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java @@ -0,0 +1,251 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.hadoop; + +import com.google.common.io.Closeables; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.filecache.DistributedCache; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.mahout.common.HadoopUtil; +import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterator; +import org.apache.mahout.math.DenseVector; +import org.apache.mahout.math.RandomAccessSparseVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; +import org.apache.mahout.math.function.Functions; + +import com.google.common.base.Preconditions; + +import java.io.IOException; +import java.net.URI; + +public final class TimesSquaredJob { + + public static final String INPUT_VECTOR = "DistributedMatrix.times.inputVector"; + public static final String IS_SPARSE_OUTPUT = "DistributedMatrix.times.outputVector.sparse"; + public static final String OUTPUT_VECTOR_DIMENSION = "DistributedMatrix.times.output.dimension"; + + public static final String OUTPUT_VECTOR_FILENAME = "DistributedMatrix.times.outputVector"; + + private TimesSquaredJob() { } + + public static Job createTimesSquaredJob(Vector v, Path matrixInputPath, Path outputVectorPath) + throws IOException { + return createTimesSquaredJob(new Configuration(), v, matrixInputPath, outputVectorPath); + } + + public static Job createTimesSquaredJob(Configuration initialConf, Vector v, Path matrixInputPath, + Path outputVectorPath) throws IOException { + + return createTimesSquaredJob(initialConf, v, matrixInputPath, outputVectorPath, TimesSquaredMapper.class, + VectorSummingReducer.class); + } + + public static Job createTimesJob(Vector v, int outDim, Path matrixInputPath, Path outputVectorPath) + throws IOException { + + return createTimesJob(new Configuration(), v, outDim, matrixInputPath, outputVectorPath); + } + + public static Job createTimesJob(Configuration initialConf, Vector v, int outDim, Path matrixInputPath, + Path outputVectorPath) throws IOException { + + return createTimesSquaredJob(initialConf, v, outDim, matrixInputPath, outputVectorPath, TimesMapper.class, + VectorSummingReducer.class); + } + + public static Job createTimesSquaredJob(Vector v, Path matrixInputPath, Path outputVectorPathBase, + Class<? extends TimesSquaredMapper> mapClass, Class<? extends VectorSummingReducer> redClass) throws IOException { + + return createTimesSquaredJob(new Configuration(), v, matrixInputPath, outputVectorPathBase, mapClass, redClass); + } + + public static Job createTimesSquaredJob(Configuration initialConf, Vector v, Path matrixInputPath, + Path outputVectorPathBase, Class<? extends TimesSquaredMapper> mapClass, + Class<? extends VectorSummingReducer> redClass) throws IOException { + + return createTimesSquaredJob(initialConf, v, v.size(), matrixInputPath, outputVectorPathBase, mapClass, redClass); + } + + public static Job createTimesSquaredJob(Vector v, int outputVectorDim, Path matrixInputPath, + Path outputVectorPathBase, Class<? extends TimesSquaredMapper> mapClass, + Class<? extends VectorSummingReducer> redClass) throws IOException { + + return createTimesSquaredJob(new Configuration(), v, outputVectorDim, matrixInputPath, outputVectorPathBase, + mapClass, redClass); + } + + public static Job createTimesSquaredJob(Configuration initialConf, Vector v, int outputVectorDim, + Path matrixInputPath, Path outputVectorPathBase, Class<? extends TimesSquaredMapper> mapClass, + Class<? extends VectorSummingReducer> redClass) throws IOException { + + FileSystem fs = FileSystem.get(matrixInputPath.toUri(), initialConf); + matrixInputPath = fs.makeQualified(matrixInputPath); + outputVectorPathBase = fs.makeQualified(outputVectorPathBase); + + long now = System.nanoTime(); + Path inputVectorPath = new Path(outputVectorPathBase, INPUT_VECTOR + '/' + now); + + + SequenceFile.Writer inputVectorPathWriter = null; + + try { + inputVectorPathWriter = new SequenceFile.Writer(fs, initialConf, inputVectorPath, NullWritable.class, + VectorWritable.class); + inputVectorPathWriter.append(NullWritable.get(), new VectorWritable(v)); + } finally { + Closeables.close(inputVectorPathWriter, false); + } + + URI ivpURI = inputVectorPath.toUri(); + DistributedCache.setCacheFiles(new URI[] { ivpURI }, initialConf); + + Job job = HadoopUtil.prepareJob(matrixInputPath, new Path(outputVectorPathBase, OUTPUT_VECTOR_FILENAME), + SequenceFileInputFormat.class, mapClass, NullWritable.class, VectorWritable.class, redClass, + NullWritable.class, VectorWritable.class, SequenceFileOutputFormat.class, initialConf); + job.setCombinerClass(redClass); + job.setJobName("TimesSquaredJob: " + matrixInputPath); + + Configuration conf = job.getConfiguration(); + conf.set(INPUT_VECTOR, ivpURI.toString()); + conf.setBoolean(IS_SPARSE_OUTPUT, !v.isDense()); + conf.setInt(OUTPUT_VECTOR_DIMENSION, outputVectorDim); + + return job; + } + + public static Vector retrieveTimesSquaredOutputVector(Path outputVectorTmpPath, Configuration conf) + throws IOException { + Path outputFile = new Path(outputVectorTmpPath, OUTPUT_VECTOR_FILENAME + "/part-r-00000"); + SequenceFileValueIterator<VectorWritable> iterator = + new SequenceFileValueIterator<>(outputFile, true, conf); + try { + return iterator.next().get(); + } finally { + Closeables.close(iterator, true); + } + } + + public static class TimesSquaredMapper<T extends WritableComparable> + extends Mapper<T,VectorWritable, NullWritable,VectorWritable> { + + private Vector outputVector; + private Vector inputVector; + + Vector getOutputVector() { + return outputVector; + } + + @Override + protected void setup(Context ctx) throws IOException, InterruptedException { + try { + Configuration conf = ctx.getConfiguration(); + Path[] localFiles = DistributedCache.getLocalCacheFiles(conf); + Preconditions.checkArgument(localFiles != null && localFiles.length >= 1, + "missing paths from the DistributedCache"); + + Path inputVectorPath = HadoopUtil.getSingleCachedFile(conf); + + SequenceFileValueIterator<VectorWritable> iterator = + new SequenceFileValueIterator<>(inputVectorPath, true, conf); + try { + inputVector = iterator.next().get(); + } finally { + Closeables.close(iterator, true); + } + + int outDim = conf.getInt(OUTPUT_VECTOR_DIMENSION, Integer.MAX_VALUE); + outputVector = conf.getBoolean(IS_SPARSE_OUTPUT, false) + ? new RandomAccessSparseVector(outDim, 10) + : new DenseVector(outDim); + } catch (IOException ioe) { + throw new IllegalStateException(ioe); + } + } + + @Override + protected void map(T key, VectorWritable v, Context context) throws IOException, InterruptedException { + + double d = scale(v); + if (d == 1.0) { + outputVector.assign(v.get(), Functions.PLUS); + } else if (d != 0.0) { + outputVector.assign(v.get(), Functions.plusMult(d)); + } + } + + protected double scale(VectorWritable v) { + return v.get().dot(inputVector); + } + + @Override + protected void cleanup(Context ctx) throws IOException, InterruptedException { + ctx.write(NullWritable.get(), new VectorWritable(outputVector)); + } + + } + + public static class TimesMapper extends TimesSquaredMapper<IntWritable> { + + + @Override + protected void map(IntWritable rowNum, VectorWritable v, Context context) throws IOException, InterruptedException { + double d = scale(v); + if (d != 0.0) { + getOutputVector().setQuick(rowNum.get(), d); + } + } + } + + public static class VectorSummingReducer extends Reducer<NullWritable,VectorWritable,NullWritable,VectorWritable> { + + private Vector outputVector; + + @Override + protected void setup(Context ctx) throws IOException, InterruptedException { + Configuration conf = ctx.getConfiguration(); + int outputDimension = conf.getInt(OUTPUT_VECTOR_DIMENSION, Integer.MAX_VALUE); + outputVector = conf.getBoolean(IS_SPARSE_OUTPUT, false) + ? new RandomAccessSparseVector(outputDimension, 10) + : new DenseVector(outputDimension); + } + + @Override + protected void reduce(NullWritable key, Iterable<VectorWritable> vectors, Context ctx) + throws IOException, InterruptedException { + + for (VectorWritable v : vectors) { + if (v != null) { + outputVector.assign(v.get(), Functions.PLUS); + } + } + ctx.write(NullWritable.get(), new VectorWritable(outputVector)); + } + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/TransposeJob.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/TransposeJob.java b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/TransposeJob.java new file mode 100644 index 0000000..60066c6 --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/TransposeJob.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.hadoop; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.hadoop.util.ToolRunner; +import org.apache.mahout.common.AbstractJob; +import org.apache.mahout.common.HadoopUtil; +import org.apache.mahout.common.mapreduce.MergeVectorsCombiner; +import org.apache.mahout.common.mapreduce.MergeVectorsReducer; +import org.apache.mahout.common.mapreduce.TransposeMapper; +import org.apache.mahout.math.VectorWritable; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** Transpose a matrix */ +public class TransposeJob extends AbstractJob { + + public static void main(String[] args) throws Exception { + ToolRunner.run(new TransposeJob(), args); + } + + @Override + public int run(String[] strings) throws Exception { + addInputOption(); + addOption("numRows", "nr", "Number of rows of the input matrix"); + addOption("numCols", "nc", "Number of columns of the input matrix"); + Map<String, List<String>> parsedArgs = parseArguments(strings); + if (parsedArgs == null) { + return -1; + } + + int numRows = Integer.parseInt(getOption("numRows")); + int numCols = Integer.parseInt(getOption("numCols")); + + DistributedRowMatrix matrix = new DistributedRowMatrix(getInputPath(), getTempPath(), numRows, numCols); + matrix.setConf(new Configuration(getConf())); + matrix.transpose(); + + return 0; + } + + public static Job buildTransposeJob(Path matrixInputPath, Path matrixOutputPath, int numInputRows) + throws IOException { + return buildTransposeJob(new Configuration(), matrixInputPath, matrixOutputPath, numInputRows); + } + + public static Job buildTransposeJob(Configuration initialConf, Path matrixInputPath, Path matrixOutputPath, + int numInputRows) throws IOException { + + Job job = HadoopUtil.prepareJob(matrixInputPath, matrixOutputPath, SequenceFileInputFormat.class, + TransposeMapper.class, IntWritable.class, VectorWritable.class, MergeVectorsReducer.class, IntWritable.class, + VectorWritable.class, SequenceFileOutputFormat.class, initialConf); + job.setCombinerClass(MergeVectorsCombiner.class); + job.getConfiguration().setInt(TransposeMapper.NEW_NUM_COLS_PARAM, numInputRows); + + job.setJobName("TransposeJob: " + matrixInputPath); + + return job; + } + + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/DistributedLanczosSolver.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/DistributedLanczosSolver.java b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/DistributedLanczosSolver.java new file mode 100644 index 0000000..86c0f6b --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/DistributedLanczosSolver.java @@ -0,0 +1,299 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.hadoop.decomposer; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import com.google.common.io.Closeables; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.mahout.common.AbstractJob; +import org.apache.mahout.math.DenseVector; +import org.apache.mahout.math.NamedVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorIterable; +import org.apache.mahout.math.VectorWritable; +import org.apache.mahout.math.decomposer.lanczos.LanczosSolver; +import org.apache.mahout.math.decomposer.lanczos.LanczosState; +import org.apache.mahout.math.hadoop.DistributedRowMatrix; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * See the SSVD code for a better option than using this: + * + * http://mahout.apache.org/users/dim-reduction/ssvd.html + * @see org.apache.mahout.math.hadoop.stochasticsvd.SSVDSolver + */ +@Deprecated +public class DistributedLanczosSolver extends LanczosSolver implements Tool { + + public static final String RAW_EIGENVECTORS = "rawEigenvectors"; + + private static final Logger log = LoggerFactory.getLogger(DistributedLanczosSolver.class); + + private Configuration conf; + + private Map<String, List<String>> parsedArgs; + + /** + * For the distributed case, the best guess at a useful initialization state for Lanczos we'll chose to be + * uniform over all input dimensions, L_2 normalized. + */ + public static Vector getInitialVector(VectorIterable corpus) { + Vector initialVector = new DenseVector(corpus.numCols()); + initialVector.assign(1.0 / Math.sqrt(corpus.numCols())); + return initialVector; + } + + public LanczosState runJob(Configuration originalConfig, + LanczosState state, + int desiredRank, + boolean isSymmetric, + String outputEigenVectorPathString) throws IOException { + ((Configurable) state.getCorpus()).setConf(new Configuration(originalConfig)); + setConf(originalConfig); + solve(state, desiredRank, isSymmetric); + serializeOutput(state, new Path(outputEigenVectorPathString)); + return state; + } + + /** + * Factored-out LanczosSolver for the purpose of invoking it programmatically + */ + public LanczosState runJob(Configuration originalConfig, + Path inputPath, + Path outputTmpPath, + int numRows, + int numCols, + boolean isSymmetric, + int desiredRank, + String outputEigenVectorPathString) throws IOException { + DistributedRowMatrix matrix = new DistributedRowMatrix(inputPath, outputTmpPath, numRows, numCols); + matrix.setConf(new Configuration(originalConfig)); + LanczosState state = new LanczosState(matrix, desiredRank, getInitialVector(matrix)); + return runJob(originalConfig, state, desiredRank, isSymmetric, outputEigenVectorPathString); + } + + @Override + public int run(String[] strings) throws Exception { + Path inputPath = new Path(AbstractJob.getOption(parsedArgs, "--input")); + Path outputPath = new Path(AbstractJob.getOption(parsedArgs, "--output")); + Path outputTmpPath = new Path(AbstractJob.getOption(parsedArgs, "--tempDir")); + Path workingDirPath = AbstractJob.getOption(parsedArgs, "--workingDir") != null + ? new Path(AbstractJob.getOption(parsedArgs, "--workingDir")) : null; + int numRows = Integer.parseInt(AbstractJob.getOption(parsedArgs, "--numRows")); + int numCols = Integer.parseInt(AbstractJob.getOption(parsedArgs, "--numCols")); + boolean isSymmetric = Boolean.parseBoolean(AbstractJob.getOption(parsedArgs, "--symmetric")); + int desiredRank = Integer.parseInt(AbstractJob.getOption(parsedArgs, "--rank")); + + boolean cleansvd = Boolean.parseBoolean(AbstractJob.getOption(parsedArgs, "--cleansvd")); + if (cleansvd) { + double maxError = Double.parseDouble(AbstractJob.getOption(parsedArgs, "--maxError")); + double minEigenvalue = Double.parseDouble(AbstractJob.getOption(parsedArgs, "--minEigenvalue")); + boolean inMemory = Boolean.parseBoolean(AbstractJob.getOption(parsedArgs, "--inMemory")); + return run(inputPath, + outputPath, + outputTmpPath, + workingDirPath, + numRows, + numCols, + isSymmetric, + desiredRank, + maxError, + minEigenvalue, + inMemory); + } + return run(inputPath, outputPath, outputTmpPath, workingDirPath, numRows, numCols, isSymmetric, desiredRank); + } + + /** + * Run the solver to produce raw eigenvectors, then run the EigenVerificationJob to clean them + * + * @param inputPath the Path to the input corpus + * @param outputPath the Path to the output + * @param outputTmpPath a Path to a temporary working directory + * @param numRows the int number of rows + * @param numCols the int number of columns + * @param isSymmetric true if the input matrix is symmetric + * @param desiredRank the int desired rank of eigenvectors to produce + * @param maxError the maximum allowable error + * @param minEigenvalue the minimum usable eigenvalue + * @param inMemory true if the verification can be done in memory + * @return an int indicating success (0) or otherwise + */ + public int run(Path inputPath, + Path outputPath, + Path outputTmpPath, + Path workingDirPath, + int numRows, + int numCols, + boolean isSymmetric, + int desiredRank, + double maxError, + double minEigenvalue, + boolean inMemory) throws Exception { + int result = run(inputPath, outputPath, outputTmpPath, workingDirPath, numRows, numCols, + isSymmetric, desiredRank); + if (result != 0) { + return result; + } + Path rawEigenVectorPath = new Path(outputPath, RAW_EIGENVECTORS); + return new EigenVerificationJob().run(inputPath, + rawEigenVectorPath, + outputPath, + outputTmpPath, + maxError, + minEigenvalue, + inMemory, + getConf() != null ? new Configuration(getConf()) : new Configuration()); + } + + /** + * Run the solver to produce the raw eigenvectors + * + * @param inputPath the Path to the input corpus + * @param outputPath the Path to the output + * @param outputTmpPath a Path to a temporary working directory + * @param numRows the int number of rows + * @param numCols the int number of columns + * @param isSymmetric true if the input matrix is symmetric + * @param desiredRank the int desired rank of eigenvectors to produce + * @return an int indicating success (0) or otherwise + */ + public int run(Path inputPath, + Path outputPath, + Path outputTmpPath, + Path workingDirPath, + int numRows, + int numCols, + boolean isSymmetric, + int desiredRank) throws Exception { + DistributedRowMatrix matrix = new DistributedRowMatrix(inputPath, outputTmpPath, numRows, numCols); + matrix.setConf(new Configuration(getConf() != null ? getConf() : new Configuration())); + + LanczosState state; + if (workingDirPath == null) { + state = new LanczosState(matrix, desiredRank, getInitialVector(matrix)); + } else { + HdfsBackedLanczosState hState = + new HdfsBackedLanczosState(matrix, desiredRank, getInitialVector(matrix), workingDirPath); + hState.setConf(matrix.getConf()); + state = hState; + } + solve(state, desiredRank, isSymmetric); + + Path outputEigenVectorPath = new Path(outputPath, RAW_EIGENVECTORS); + serializeOutput(state, outputEigenVectorPath); + return 0; + } + + /** + * @param state The final LanczosState to be serialized + * @param outputPath The path (relative to the current Configuration's FileSystem) to save the output to. + */ + public void serializeOutput(LanczosState state, Path outputPath) throws IOException { + int numEigenVectors = state.getIterationNumber(); + log.info("Persisting {} eigenVectors and eigenValues to: {}", numEigenVectors, outputPath); + Configuration conf = getConf() != null ? getConf() : new Configuration(); + FileSystem fs = FileSystem.get(outputPath.toUri(), conf); + SequenceFile.Writer seqWriter = + new SequenceFile.Writer(fs, conf, outputPath, IntWritable.class, VectorWritable.class); + try { + IntWritable iw = new IntWritable(); + for (int i = 0; i < numEigenVectors; i++) { + // Persist eigenvectors sorted by eigenvalues in descending order\ + NamedVector v = new NamedVector(state.getRightSingularVector(numEigenVectors - 1 - i), + "eigenVector" + i + ", eigenvalue = " + state.getSingularValue(numEigenVectors - 1 - i)); + Writable vw = new VectorWritable(v); + iw.set(i); + seqWriter.append(iw, vw); + } + } finally { + Closeables.close(seqWriter, false); + } + } + + @Override + public void setConf(Configuration configuration) { + conf = configuration; + } + + @Override + public Configuration getConf() { + return conf; + } + + public DistributedLanczosSolverJob job() { + return new DistributedLanczosSolverJob(); + } + + /** + * Inner subclass of AbstractJob so we get access to AbstractJob's functionality w.r.t. cmdline options, but still + * sublcass LanczosSolver. + */ + public class DistributedLanczosSolverJob extends AbstractJob { + @Override + public void setConf(Configuration conf) { + DistributedLanczosSolver.this.setConf(conf); + } + + @Override + public Configuration getConf() { + return DistributedLanczosSolver.this.getConf(); + } + + @Override + public int run(String[] args) throws Exception { + addInputOption(); + addOutputOption(); + addOption("numRows", "nr", "Number of rows of the input matrix"); + addOption("numCols", "nc", "Number of columns of the input matrix"); + addOption("rank", "r", "Desired decomposition rank (note: only roughly 1/4 to 1/3 " + + "of these will have the top portion of the spectrum)"); + addOption("symmetric", "sym", "Is the input matrix square and symmetric?"); + addOption("workingDir", "wd", "Working directory path to store Lanczos basis vectors " + + "(to be used on restarts, and to avoid too much RAM usage)"); + // options required to run cleansvd job + addOption("cleansvd", "cl", "Run the EigenVerificationJob to clean the eigenvectors after SVD", false); + addOption("maxError", "err", "Maximum acceptable error", "0.05"); + addOption("minEigenvalue", "mev", "Minimum eigenvalue to keep the vector for", "0.0"); + addOption("inMemory", "mem", "Buffer eigen matrix into memory (if you have enough!)", "false"); + + DistributedLanczosSolver.this.parsedArgs = parseArguments(args); + if (DistributedLanczosSolver.this.parsedArgs == null) { + return -1; + } else { + return DistributedLanczosSolver.this.run(args); + } + } + } + + public static void main(String[] args) throws Exception { + ToolRunner.run(new DistributedLanczosSolver().job(), args); + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVector.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVector.java b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVector.java new file mode 100644 index 0000000..d2f0c8c --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVector.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.hadoop.decomposer; + +import org.apache.mahout.math.DenseVector; +import org.apache.mahout.math.NamedVector; +import org.apache.mahout.math.Vector; + +import java.util.regex.Pattern; + +/** + * TODO this is a horrible hack. Make a proper writable subclass also. + */ +public class EigenVector extends NamedVector { + + private static final Pattern EQUAL_PATTERN = Pattern.compile(" = "); + private static final Pattern PIPE_PATTERN = Pattern.compile("\\|"); + + public EigenVector(Vector v, double eigenValue, double cosAngleError, int order) { + super(v instanceof DenseVector ? (DenseVector) v : new DenseVector(v), + "e|" + order + "| = |" + eigenValue + "|, err = " + cosAngleError); + } + + public double getEigenValue() { + return getEigenValue(getName()); + } + + public double getCosAngleError() { + return getCosAngleError(getName()); + } + + public int getIndex() { + return getIndex(getName()); + } + + public static double getEigenValue(CharSequence name) { + return parseMetaData(name)[1]; + } + + public static double getCosAngleError(CharSequence name) { + return parseMetaData(name)[2]; + } + + public static int getIndex(CharSequence name) { + return (int)parseMetaData(name)[0]; + } + + public static double[] parseMetaData(CharSequence name) { + double[] m = new double[3]; + String[] s = EQUAL_PATTERN.split(name); + m[0] = Double.parseDouble(PIPE_PATTERN.split(s[0])[1]); + m[1] = Double.parseDouble(PIPE_PATTERN.split(s[1])[1]); + m[2] = Double.parseDouble(s[2].substring(1)); + return m; + } + + protected double[] parseMetaData() { + return parseMetaData(getName()); + } + +}
