http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/decomposer/hebbian/HebbianUpdater.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/src/main/java/org/apache/mahout/math/decomposer/hebbian/HebbianUpdater.java
 
b/community/mahout-mr/src/main/java/org/apache/mahout/math/decomposer/hebbian/HebbianUpdater.java
new file mode 100644
index 0000000..2080c3a
--- /dev/null
+++ 
b/community/mahout-mr/src/main/java/org/apache/mahout/math/decomposer/hebbian/HebbianUpdater.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.decomposer.hebbian;
+
+
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.function.PlusMult;
+
+public class HebbianUpdater implements EigenUpdater {
+
+  @Override
+  public void update(Vector pseudoEigen,
+                     Vector trainingVector,
+                     TrainingState currentState) {
+    double trainingVectorNorm = trainingVector.norm(2);
+    int numPreviousEigens = currentState.getNumEigensProcessed();
+    if (numPreviousEigens > 0 && currentState.isFirstPass()) {
+      updateTrainingProjectionsVector(currentState, trainingVector, 
numPreviousEigens - 1);
+    }
+    if (currentState.getActivationDenominatorSquared() == 0 || 
trainingVectorNorm == 0) {
+      if (currentState.getActivationDenominatorSquared() == 0) {
+        pseudoEigen.assign(trainingVector, new PlusMult(1));
+        
currentState.setHelperVector(currentState.currentTrainingProjection().clone());
+        double helperNorm = currentState.getHelperVector().norm(2);
+        currentState.setActivationDenominatorSquared(trainingVectorNorm * 
trainingVectorNorm - helperNorm * helperNorm);
+      }
+      return;
+    }
+    currentState.setActivationNumerator(pseudoEigen.dot(trainingVector));
+    currentState.setActivationNumerator(
+        currentState.getActivationNumerator()
+            - 
currentState.getHelperVector().dot(currentState.currentTrainingProjection()));
+
+    double activation = currentState.getActivationNumerator()
+        / Math.sqrt(currentState.getActivationDenominatorSquared());
+    currentState.setActivationDenominatorSquared(
+        currentState.getActivationDenominatorSquared()
+            + 2 * activation * currentState.getActivationNumerator()
+            + activation * activation
+                * (trainingVector.getLengthSquared() - 
currentState.currentTrainingProjection().getLengthSquared()));
+    if (numPreviousEigens > 0) {
+      
currentState.getHelperVector().assign(currentState.currentTrainingProjection(), 
new PlusMult(activation));
+    }
+    pseudoEigen.assign(trainingVector, new PlusMult(activation));
+  }
+
+  private static void updateTrainingProjectionsVector(TrainingState state,
+                                                      Vector trainingVector,
+                                                      int previousEigenIndex) {
+    Vector previousEigen = state.mostRecentEigen();
+    Vector currentTrainingVectorProjection = state.currentTrainingProjection();
+    double projection = previousEigen.dot(trainingVector);
+    currentTrainingVectorProjection.set(previousEigenIndex, projection);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/decomposer/hebbian/TrainingState.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/src/main/java/org/apache/mahout/math/decomposer/hebbian/TrainingState.java
 
b/community/mahout-mr/src/main/java/org/apache/mahout/math/decomposer/hebbian/TrainingState.java
new file mode 100644
index 0000000..af6c2ef
--- /dev/null
+++ 
b/community/mahout-mr/src/main/java/org/apache/mahout/math/decomposer/hebbian/TrainingState.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.decomposer.hebbian;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Matrix;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.decomposer.EigenStatus;
+
+public class TrainingState {
+
+  private Matrix currentEigens;
+  private int numEigensProcessed;
+  private List<Double> currentEigenValues;
+  private Matrix trainingProjections;
+  private int trainingIndex;
+  private Vector helperVector;
+  private boolean firstPass;
+  private List<EigenStatus> statusProgress;
+  private double activationNumerator;
+  private double activationDenominatorSquared;
+
+  TrainingState(Matrix eigens, Matrix projections) {
+    currentEigens = eigens;
+    trainingProjections = projections;
+    trainingIndex = 0;
+    helperVector = new DenseVector(eigens.numRows());
+    firstPass = true;
+    statusProgress = new ArrayList<>();
+    activationNumerator = 0;
+    activationDenominatorSquared = 0;
+    numEigensProcessed = 0;
+  }
+
+  public Vector mostRecentEigen() {
+    return currentEigens.viewRow(numEigensProcessed - 1);
+  }
+
+  public Vector currentTrainingProjection() {
+    if (trainingProjections.viewRow(trainingIndex) == null) {
+      trainingProjections.assignRow(trainingIndex, new 
DenseVector(currentEigens.numCols()));
+    }
+    return trainingProjections.viewRow(trainingIndex);
+  }
+
+  public Matrix getCurrentEigens() {
+    return currentEigens;
+  }
+
+  public void setCurrentEigens(Matrix currentEigens) {
+    this.currentEigens = currentEigens;
+  }
+
+  public int getNumEigensProcessed() {
+    return numEigensProcessed;
+  }
+
+  public void setNumEigensProcessed(int numEigensProcessed) {
+    this.numEigensProcessed = numEigensProcessed;
+  }
+
+  public List<Double> getCurrentEigenValues() {
+    return currentEigenValues;
+  }
+
+  public void setCurrentEigenValues(List<Double> currentEigenValues) {
+    this.currentEigenValues = currentEigenValues;
+  }
+
+  public Matrix getTrainingProjections() {
+    return trainingProjections;
+  }
+
+  public void setTrainingProjections(Matrix trainingProjections) {
+    this.trainingProjections = trainingProjections;
+  }
+
+  public int getTrainingIndex() {
+    return trainingIndex;
+  }
+
+  public void setTrainingIndex(int trainingIndex) {
+    this.trainingIndex = trainingIndex;
+  }
+
+  public Vector getHelperVector() {
+    return helperVector;
+  }
+
+  public void setHelperVector(Vector helperVector) {
+    this.helperVector = helperVector;
+  }
+
+  public boolean isFirstPass() {
+    return firstPass;
+  }
+
+  public void setFirstPass(boolean firstPass) {
+    this.firstPass = firstPass;
+  }
+
+  public List<EigenStatus> getStatusProgress() {
+    return statusProgress;
+  }
+
+  public void setStatusProgress(List<EigenStatus> statusProgress) {
+    this.statusProgress = statusProgress;
+  }
+
+  public double getActivationNumerator() {
+    return activationNumerator;
+  }
+
+  public void setActivationNumerator(double activationNumerator) {
+    this.activationNumerator = activationNumerator;
+  }
+
+  public double getActivationDenominatorSquared() {
+    return activationDenominatorSquared;
+  }
+
+  public void setActivationDenominatorSquared(double 
activationDenominatorSquared) {
+    this.activationDenominatorSquared = activationDenominatorSquared;
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/decomposer/lanczos/LanczosSolver.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/src/main/java/org/apache/mahout/math/decomposer/lanczos/LanczosSolver.java
 
b/community/mahout-mr/src/main/java/org/apache/mahout/math/decomposer/lanczos/LanczosSolver.java
new file mode 100644
index 0000000..61a77db
--- /dev/null
+++ 
b/community/mahout-mr/src/main/java/org/apache/mahout/math/decomposer/lanczos/LanczosSolver.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.decomposer.lanczos;
+
+
+import java.util.EnumMap;
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+import org.apache.mahout.math.Matrix;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorIterable;
+import org.apache.mahout.math.function.DoubleFunction;
+import org.apache.mahout.math.function.PlusMult;
+import org.apache.mahout.math.solver.EigenDecomposition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple implementation of the <a 
href="http://en.wikipedia.org/wiki/Lanczos_algorithm";>Lanczos algorithm</a> for
+ * finding eigenvalues of a symmetric matrix, applied to non-symmetric 
matrices by applying Matrix.timesSquared(vector)
+ * as the "matrix-multiplication" method.<p>
+ *
+ * See the SSVD code for a better option
+ * {@link org.apache.mahout.math.ssvd.SequentialBigSvd}
+ * See also the docs on
+ * <a href=https://mahout.apache.org/users/dim-reduction/ssvd.html>stochastic
+ * projection SVD</a>
+ * <p>
+ * To avoid floating point overflow problems which arise in power-methods like 
Lanczos, an initial pass is made
+ * through the input matrix to
+ * <ul>
+ *   <li>generate a good starting seed vector by summing all the rows of the 
input matrix, and</li>
+ *   <li>compute the trace(inputMatrix<sup>t</sup>*matrix)
+ * </ul>
+ * <p>
+ * This latter value, being the sum of all of the singular values, is used to 
rescale the entire matrix, effectively
+ * forcing the largest singular value to be strictly less than one, and 
transforming floating point <em>overflow</em>
+ * problems into floating point <em>underflow</em> (ie, very small singular 
values will become invisible, as they
+ * will appear to be zero and the algorithm will terminate).
+ * <p>This implementation uses {@link EigenDecomposition} to do the
+ * eigenvalue extraction from the small (desiredRank x desiredRank) 
tridiagonal matrix.  Numerical stability is
+ * achieved via brute-force: re-orthogonalization against all previous 
eigenvectors is computed after every pass.
+ * This can be made smarter if (when!) this proves to be a major bottleneck.  
Of course, this step can be parallelized
+ * as well.
+ * @see org.apache.mahout.math.ssvd.SequentialBigSvd
+ */
+@Deprecated
+public class LanczosSolver {
+
+  private static final Logger log = 
LoggerFactory.getLogger(LanczosSolver.class);
+
+  public static final double SAFE_MAX = 1.0e150;
+
+  public enum TimingSection {
+    ITERATE, ORTHOGANLIZE, TRIDIAG_DECOMP, FINAL_EIGEN_CREATE
+  }
+
+  private final Map<TimingSection, Long> startTimes = new 
EnumMap<>(TimingSection.class);
+  private final Map<TimingSection, Long> times = new 
EnumMap<>(TimingSection.class);
+
+  private static final class Scale extends DoubleFunction {
+    private final double d;
+
+    private Scale(double d) {
+      this.d = d;
+    }
+
+    @Override
+    public double apply(double arg1) {
+      return arg1 * d;
+    }
+  }
+
+  public void solve(LanczosState state,
+                    int desiredRank) {
+    solve(state, desiredRank, false);
+  }
+
+  public void solve(LanczosState state,
+                    int desiredRank,
+                    boolean isSymmetric) {
+    VectorIterable corpus = state.getCorpus();
+    log.info("Finding {} singular vectors of matrix with {} rows, via Lanczos",
+        desiredRank, corpus.numRows());
+    int i = state.getIterationNumber();
+    Vector currentVector = state.getBasisVector(i - 1);
+    Vector previousVector = state.getBasisVector(i - 2);
+    double beta = 0;
+    Matrix triDiag = state.getDiagonalMatrix();
+    while (i < desiredRank) {
+      startTime(TimingSection.ITERATE);
+      Vector nextVector = isSymmetric ? corpus.times(currentVector) : 
corpus.timesSquared(currentVector);
+      log.info("{} passes through the corpus so far...", i);
+      if (state.getScaleFactor() <= 0) {
+        state.setScaleFactor(calculateScaleFactor(nextVector));
+      }
+      nextVector.assign(new Scale(1.0 / state.getScaleFactor()));
+      if (previousVector != null) {
+        nextVector.assign(previousVector, new PlusMult(-beta));
+      }
+      // now orthogonalize
+      double alpha = currentVector.dot(nextVector);
+      nextVector.assign(currentVector, new PlusMult(-alpha));
+      endTime(TimingSection.ITERATE);
+      startTime(TimingSection.ORTHOGANLIZE);
+      orthoganalizeAgainstAllButLast(nextVector, state);
+      endTime(TimingSection.ORTHOGANLIZE);
+      // and normalize
+      beta = nextVector.norm(2);
+      if (outOfRange(beta) || outOfRange(alpha)) {
+        log.warn("Lanczos parameters out of range: alpha = {}, beta = {}.  
Bailing out early!",
+            alpha, beta);
+        break;
+      }
+      nextVector.assign(new Scale(1 / beta));
+      state.setBasisVector(i, nextVector);
+      previousVector = currentVector;
+      currentVector = nextVector;
+      // save the projections and norms!
+      triDiag.set(i - 1, i - 1, alpha);
+      if (i < desiredRank - 1) {
+        triDiag.set(i - 1, i, beta);
+        triDiag.set(i, i - 1, beta);
+      }
+      state.setIterationNumber(++i);
+    }
+    startTime(TimingSection.TRIDIAG_DECOMP);
+
+    log.info("Lanczos iteration complete - now to diagonalize the tri-diagonal 
auxiliary matrix.");
+    // at this point, have tridiag all filled out, and basis is all filled 
out, and orthonormalized
+    EigenDecomposition decomp = new EigenDecomposition(triDiag);
+
+    Matrix eigenVects = decomp.getV();
+    Vector eigenVals = decomp.getRealEigenvalues();
+    endTime(TimingSection.TRIDIAG_DECOMP);
+    startTime(TimingSection.FINAL_EIGEN_CREATE);
+    for (int row = 0; row < i; row++) {
+      Vector realEigen = null;
+
+      Vector ejCol = eigenVects.viewColumn(row);
+      int size = Math.min(ejCol.size(), state.getBasisSize());
+      for (int j = 0; j < size; j++) {
+        double d = ejCol.get(j);
+        Vector rowJ = state.getBasisVector(j);
+        if (realEigen == null) {
+          realEigen = rowJ.like();
+        }
+        realEigen.assign(rowJ, new PlusMult(d));
+      }
+
+      Preconditions.checkState(realEigen != null);
+      assert realEigen != null;
+
+      realEigen = realEigen.normalize();
+      state.setRightSingularVector(row, realEigen);
+      double e = eigenVals.get(row) * state.getScaleFactor();
+      if (!isSymmetric) {
+        e = Math.sqrt(e);
+      }
+      log.info("Eigenvector {} found with eigenvalue {}", row, e);
+      state.setSingularValue(row, e);
+    }
+    log.info("LanczosSolver finished.");
+    endTime(TimingSection.FINAL_EIGEN_CREATE);
+  }
+
+  protected static double calculateScaleFactor(Vector nextVector) {
+    return nextVector.norm(2);
+  }
+
+  private static boolean outOfRange(double d) {
+    return Double.isNaN(d) || d > SAFE_MAX || -d > SAFE_MAX;
+  }
+
+  protected static void orthoganalizeAgainstAllButLast(Vector nextVector, 
LanczosState state) {
+    for (int i = 0; i < state.getIterationNumber(); i++) {
+      Vector basisVector = state.getBasisVector(i);
+      double alpha;
+      if (basisVector == null || (alpha = nextVector.dot(basisVector)) == 0.0) 
{
+        continue;
+      }
+      nextVector.assign(basisVector, new PlusMult(-alpha));
+    }
+  }
+
+  private void startTime(TimingSection section) {
+    startTimes.put(section, System.nanoTime());
+  }
+
+  private void endTime(TimingSection section) {
+    if (!times.containsKey(section)) {
+      times.put(section, 0L);
+    }
+    times.put(section, times.get(section) + System.nanoTime() - 
startTimes.get(section));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/decomposer/lanczos/LanczosState.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/src/main/java/org/apache/mahout/math/decomposer/lanczos/LanczosState.java
 
b/community/mahout-mr/src/main/java/org/apache/mahout/math/decomposer/lanczos/LanczosState.java
new file mode 100644
index 0000000..2ba34bd
--- /dev/null
+++ 
b/community/mahout-mr/src/main/java/org/apache/mahout/math/decomposer/lanczos/LanczosState.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.decomposer.lanczos;
+
+import com.google.common.collect.Maps;
+import org.apache.mahout.math.DenseMatrix;
+import org.apache.mahout.math.Matrix;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorIterable;
+
+import java.util.Map;
+
+@Deprecated
+public class LanczosState {
+
+  protected  Matrix diagonalMatrix;
+  protected final VectorIterable corpus;
+  protected double scaleFactor;
+  protected int iterationNumber;
+  protected final int desiredRank;
+  protected Map<Integer, Vector> basis;
+  protected final Map<Integer, Double> singularValues;
+  protected Map<Integer, Vector> singularVectors;
+
+  public LanczosState(VectorIterable corpus, int desiredRank, Vector 
initialVector) {
+    this.corpus = corpus;
+    this.desiredRank = desiredRank;
+    intitializeBasisAndSingularVectors();
+    setBasisVector(0, initialVector);
+    scaleFactor = 0;
+    diagonalMatrix = new DenseMatrix(desiredRank, desiredRank);
+    singularValues = Maps.newHashMap();
+    iterationNumber = 1;
+  }
+
+  private void intitializeBasisAndSingularVectors() {
+    basis = Maps.newHashMap();
+    singularVectors = Maps.newHashMap();
+  }
+
+  public Matrix getDiagonalMatrix() {
+    return diagonalMatrix;
+  }
+
+  public int getIterationNumber() {
+    return iterationNumber;
+  }
+
+  public double getScaleFactor() {
+    return scaleFactor;
+  }
+
+  public VectorIterable getCorpus() {
+    return corpus;
+  }
+
+  public Vector getRightSingularVector(int i) {
+    return singularVectors.get(i);
+  }
+
+  public Double getSingularValue(int i) {
+    return singularValues.get(i);
+  }
+
+  public Vector getBasisVector(int i) {
+    return basis.get(i);
+  }
+
+  public int getBasisSize() {
+    return basis.size();
+  }
+
+  public void setBasisVector(int i, Vector basisVector) {
+    basis.put(i, basisVector);
+  }
+
+  public void setScaleFactor(double scale) {
+    scaleFactor = scale;
+  }
+
+  public void setIterationNumber(int i) {
+    iterationNumber = i;
+  }
+
+  public void setRightSingularVector(int i, Vector vector) {
+    singularVectors.put(i, vector);
+  }
+
+  public void setSingularValue(int i, double value) {
+    singularValues.put(i, value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java
 
b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java
new file mode 100644
index 0000000..de5e216
--- /dev/null
+++ 
b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java
@@ -0,0 +1,390 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterator;
+import org.apache.mahout.math.CardinalityException;
+import org.apache.mahout.math.MatrixSlice;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorIterable;
+import org.apache.mahout.math.VectorWritable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+
+/**
+ * DistributedRowMatrix is a FileSystem-backed VectorIterable in which the 
vectors live in a
+ * SequenceFile<WritableComparable,VectorWritable>, and distributed operations 
are executed as M/R passes on
+ * Hadoop.  The usage is as follows: <p>
+ * <p>
+ * <pre>
+ *   // the path must already contain an already created SequenceFile!
+ *   DistributedRowMatrix m = new 
DistributedRowMatrix("path/to/vector/sequenceFile", "tmp/path", 10000000, 
250000);
+ *   m.setConf(new Configuration());
+ *   // now if we want to multiply a vector by this matrix, it's dimension 
must equal the row dimension of this
+ *   // matrix.  If we want to timesSquared() a vector by this matrix, its 
dimension must equal the column dimension
+ *   // of the matrix.
+ *   Vector v = new DenseVector(250000);
+ *   // now the following operation will be done via a M/R pass via Hadoop.
+ *   Vector w = m.timesSquared(v);
+ * </pre>
+ *
+ */
+public class DistributedRowMatrix implements VectorIterable, Configurable {
+  public static final String KEEP_TEMP_FILES = 
"DistributedMatrix.keep.temp.files";
+
+  private static final Logger log = 
LoggerFactory.getLogger(DistributedRowMatrix.class);
+
+  private final Path inputPath;
+  private final Path outputTmpPath;
+  private Configuration conf;
+  private Path rowPath;
+  private Path outputTmpBasePath;
+  private final int numRows;
+  private final int numCols;
+  private boolean keepTempFiles;
+
+  public DistributedRowMatrix(Path inputPath,
+                              Path outputTmpPath,
+                              int numRows,
+                              int numCols) {
+    this(inputPath, outputTmpPath, numRows, numCols, false);
+  }
+
+  public DistributedRowMatrix(Path inputPath,
+                              Path outputTmpPath,
+                              int numRows,
+                              int numCols,
+                              boolean keepTempFiles) {
+    this.inputPath = inputPath;
+    this.outputTmpPath = outputTmpPath;
+    this.numRows = numRows;
+    this.numCols = numCols;
+    this.keepTempFiles = keepTempFiles;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+    try {
+      FileSystem fs = FileSystem.get(inputPath.toUri(), conf);
+      rowPath = fs.makeQualified(inputPath);
+      outputTmpBasePath = fs.makeQualified(outputTmpPath);
+      keepTempFiles = conf.getBoolean(KEEP_TEMP_FILES, false);
+    } catch (IOException ioe) {
+      throw new IllegalStateException(ioe);
+    }
+  }
+
+  public Path getRowPath() {
+    return rowPath;
+  }
+
+  public Path getOutputTempPath() {
+    return outputTmpBasePath;
+  }
+
+  public void setOutputTempPathString(String outPathString) {
+    try {
+      outputTmpBasePath = FileSystem.get(conf).makeQualified(new 
Path(outPathString));
+    } catch (IOException ioe) {
+      log.warn("Unable to set outputBasePath to {}, leaving as {}",
+          outPathString, outputTmpBasePath);
+    }
+  }
+
+  @Override
+  public Iterator<MatrixSlice> iterateNonEmpty() {
+    return iterator();
+  }
+
+  @Override
+  public Iterator<MatrixSlice> iterateAll() {
+    try {
+      Path pathPattern = rowPath;
+      if (FileSystem.get(conf).getFileStatus(rowPath).isDir()) {
+        pathPattern = new Path(rowPath, "*");
+      }
+      return Iterators.transform(
+          new SequenceFileDirIterator<IntWritable,VectorWritable>(pathPattern,
+                                                                  
PathType.GLOB,
+                                                                  
PathFilters.logsCRCFilter(),
+                                                                  null,
+                                                                  true,
+                                                                  conf),
+          new Function<Pair<IntWritable,VectorWritable>,MatrixSlice>() {
+            @Override
+            public MatrixSlice apply(Pair<IntWritable, VectorWritable> from) {
+              return new MatrixSlice(from.getSecond().get(), 
from.getFirst().get());
+            }
+          });
+    } catch (IOException ioe) {
+      throw new IllegalStateException(ioe);
+    }
+  }
+
+  @Override
+  public int numSlices() {
+    return numRows();
+  }
+
+  @Override
+  public int numRows() {
+    return numRows;
+  }
+
+  @Override
+  public int numCols() {
+    return numCols;
+  }
+
+
+  /**
+   * This implements matrix this.transpose().times(other)
+   * @param other   a DistributedRowMatrix
+   * @return    a DistributedRowMatrix containing the product
+   */
+  public DistributedRowMatrix times(DistributedRowMatrix other) throws 
IOException {
+    return times(other, new Path(outputTmpBasePath.getParent(), "productWith-" 
+ (System.nanoTime() & 0xFF)));
+  }
+
+  /**
+   * This implements matrix this.transpose().times(other)
+   * @param other   a DistributedRowMatrix
+   * @param outPath path to write result to
+   * @return    a DistributedRowMatrix containing the product
+   */
+  public DistributedRowMatrix times(DistributedRowMatrix other, Path outPath) 
throws IOException {
+    if (numRows != other.numRows()) {
+      throw new CardinalityException(numRows, other.numRows());
+    }
+
+    Configuration initialConf = getConf() == null ? new Configuration() : 
getConf();
+    Configuration conf =
+        MatrixMultiplicationJob.createMatrixMultiplyJobConf(initialConf,
+                                                            rowPath,
+                                                            other.rowPath,
+                                                            outPath,
+                                                            other.numCols);
+    JobClient.runJob(new JobConf(conf));
+    DistributedRowMatrix out = new DistributedRowMatrix(outPath, 
outputTmpPath, numCols, other.numCols());
+    out.setConf(conf);
+    return out;
+  }
+
+  public Vector columnMeans() throws IOException {
+    return columnMeans("SequentialAccessSparseVector");
+  }
+
+  /**
+   * Returns the column-wise mean of a DistributedRowMatrix
+   *
+   * @param vectorClass
+   *          desired class for the column-wise mean vector e.g.
+   *          RandomAccessSparseVector, DenseVector
+   * @return Vector containing the column-wise mean of this
+   */
+  public Vector columnMeans(String vectorClass) throws IOException {
+    Path outputVectorTmpPath =
+        new Path(outputTmpBasePath, new 
Path(Long.toString(System.nanoTime())));
+    Configuration initialConf =
+        getConf() == null ? new Configuration() : getConf();
+    String vectorClassFull = "org.apache.mahout.math." + vectorClass;
+    Vector mean = MatrixColumnMeansJob.run(initialConf, rowPath, 
outputVectorTmpPath, vectorClassFull);
+    if (!keepTempFiles) {
+      FileSystem fs = outputVectorTmpPath.getFileSystem(conf);
+      fs.delete(outputVectorTmpPath, true);
+    }
+    return mean;
+  }
+
+  public DistributedRowMatrix transpose() throws IOException {
+    Path outputPath = new Path(rowPath.getParent(), "transpose-" + 
(System.nanoTime() & 0xFF));
+    Configuration initialConf = getConf() == null ? new Configuration() : 
getConf();
+    Job transposeJob = TransposeJob.buildTransposeJob(initialConf, rowPath, 
outputPath, numRows);
+
+    try {
+      transposeJob.waitForCompletion(true);
+    } catch (Exception e) {
+      throw new IllegalStateException("transposition failed", e);
+    }
+
+    DistributedRowMatrix m = new DistributedRowMatrix(outputPath, 
outputTmpPath, numCols, numRows);
+    m.setConf(this.conf);
+    return m;
+  }
+
+  @Override
+  public Vector times(Vector v) {
+    try {
+      Configuration initialConf = getConf() == null ? new Configuration() : 
getConf();
+      Path outputVectorTmpPath = new Path(outputTmpBasePath, new 
Path(Long.toString(System.nanoTime())));
+
+      Job job = TimesSquaredJob.createTimesJob(initialConf, v, numRows, 
rowPath, outputVectorTmpPath);
+
+      try {
+        job.waitForCompletion(true);
+      } catch (Exception e) {
+        throw new IllegalStateException("times failed", e);
+      }
+
+      Vector result = 
TimesSquaredJob.retrieveTimesSquaredOutputVector(outputVectorTmpPath, conf);
+      if (!keepTempFiles) {
+        FileSystem fs = outputVectorTmpPath.getFileSystem(conf);
+        fs.delete(outputVectorTmpPath, true);
+      }
+      return result;
+    } catch (IOException ioe) {
+      throw new IllegalStateException(ioe);
+    }
+  }
+
+  @Override
+  public Vector timesSquared(Vector v) {
+    try {
+      Configuration initialConf = getConf() == null ? new Configuration() : 
getConf();
+      Path outputVectorTmpPath = new Path(outputTmpBasePath, new 
Path(Long.toString(System.nanoTime())));
+
+      Job job = TimesSquaredJob.createTimesSquaredJob(initialConf, v, rowPath, 
outputVectorTmpPath);
+
+      try {
+        job.waitForCompletion(true);
+      } catch (Exception e) {
+        throw new IllegalStateException("timesSquared failed", e);
+      }
+
+      Vector result = 
TimesSquaredJob.retrieveTimesSquaredOutputVector(outputVectorTmpPath, conf);
+      if (!keepTempFiles) {
+        FileSystem fs = outputVectorTmpPath.getFileSystem(conf);
+        fs.delete(outputVectorTmpPath, true);
+      }
+      return result;
+    } catch (IOException ioe) {
+      throw new IllegalStateException(ioe);
+    }
+  }
+
+  @Override
+  public Iterator<MatrixSlice> iterator() {
+    return iterateAll();
+  }
+
+  public static class MatrixEntryWritable implements 
WritableComparable<MatrixEntryWritable> {
+    private int row;
+    private int col;
+    private double val;
+
+    public int getRow() {
+      return row;
+    }
+
+    public void setRow(int row) {
+      this.row = row;
+    }
+
+    public int getCol() {
+      return col;
+    }
+
+    public void setCol(int col) {
+      this.col = col;
+    }
+
+    public double getVal() {
+      return val;
+    }
+
+    public void setVal(double val) {
+      this.val = val;
+    }
+
+    @Override
+    public int compareTo(MatrixEntryWritable o) {
+      if (row > o.row) {
+        return 1;
+      } else if (row < o.row) {
+        return -1;
+      } else {
+        if (col > o.col) {
+          return 1;
+        } else if (col < o.col) {
+          return -1;
+        } else {
+          return 0;
+        }
+      }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (!(o instanceof MatrixEntryWritable)) {
+        return false;
+      }
+      MatrixEntryWritable other = (MatrixEntryWritable) o;
+      return row == other.row && col == other.col;
+    }
+
+    @Override
+    public int hashCode() {
+      return row + 31 * col;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      out.writeInt(row);
+      out.writeInt(col);
+      out.writeDouble(val);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      row = in.readInt();
+      col = in.readInt();
+      val = in.readDouble();
+    }
+
+    @Override
+    public String toString() {
+      return "(" + row + ',' + col + "):" + val;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/MatrixColumnMeansJob.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/MatrixColumnMeansJob.java
 
b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/MatrixColumnMeansJob.java
new file mode 100644
index 0000000..b4f459a
--- /dev/null
+++ 
b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/MatrixColumnMeansJob.java
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.mahout.math.hadoop;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.mahout.common.ClassUtils;
+import 
org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterator;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.function.Functions;
+
+import com.google.common.io.Closeables;
+
+/**
+ * MatrixColumnMeansJob is a job for calculating the column-wise mean of a
+ * DistributedRowMatrix. This job can be accessed using
+ * DistributedRowMatrix.columnMeans()
+ */
+public final class MatrixColumnMeansJob {
+
+  public static final String VECTOR_CLASS =
+    "DistributedRowMatrix.columnMeans.vector.class";
+
+  private MatrixColumnMeansJob() {
+  }
+
+  public static Vector run(Configuration conf,
+                           Path inputPath,
+                           Path outputVectorTmpPath) throws IOException {
+    return run(conf, inputPath, outputVectorTmpPath, null);
+  }
+
+  /**
+   * Job for calculating column-wise mean of a DistributedRowMatrix
+   *
+   * @param initialConf
+   * @param inputPath
+   *          path to DistributedRowMatrix input
+   * @param outputVectorTmpPath
+   *          path for temporary files created during job
+   * @param vectorClass
+   *          String of desired class for returned vector e.g. DenseVector,
+   *          RandomAccessSparseVector (may be null for {@link DenseVector} )
+   * @return Vector containing column-wise mean of DistributedRowMatrix
+   */
+  public static Vector run(Configuration initialConf,
+                           Path inputPath,
+                           Path outputVectorTmpPath,
+                           String vectorClass) throws IOException {
+
+    try {
+      initialConf.set(VECTOR_CLASS,
+                      vectorClass == null ? DenseVector.class.getName()
+                          : vectorClass);
+
+      Job job = new Job(initialConf, "MatrixColumnMeansJob");
+      job.setJarByClass(MatrixColumnMeansJob.class);
+
+      FileOutputFormat.setOutputPath(job, outputVectorTmpPath);
+      
+      outputVectorTmpPath.getFileSystem(job.getConfiguration())
+                         .delete(outputVectorTmpPath, true);
+      job.setNumReduceTasks(1);
+      FileOutputFormat.setOutputPath(job, outputVectorTmpPath);
+      FileInputFormat.addInputPath(job, inputPath);
+      job.setInputFormatClass(SequenceFileInputFormat.class);
+      job.setOutputFormatClass(SequenceFileOutputFormat.class);
+      FileOutputFormat.setOutputPath(job, outputVectorTmpPath);
+
+      job.setMapperClass(MatrixColumnMeansMapper.class);
+      job.setReducerClass(MatrixColumnMeansReducer.class);
+      job.setMapOutputKeyClass(NullWritable.class);
+      job.setMapOutputValueClass(VectorWritable.class);
+      job.setOutputKeyClass(IntWritable.class);
+      job.setOutputValueClass(VectorWritable.class);
+      job.submit();
+      job.waitForCompletion(true);
+
+      Path tmpFile = new Path(outputVectorTmpPath, "part-r-00000");
+      SequenceFileValueIterator<VectorWritable> iterator =
+        new SequenceFileValueIterator<>(tmpFile, true, initialConf);
+      try {
+        if (iterator.hasNext()) {
+          return iterator.next().get();
+        } else {
+          return (Vector) Class.forName(vectorClass).getConstructor(int.class)
+                               .newInstance(0);
+        }
+      } finally {
+        Closeables.close(iterator, true);
+      }
+    } catch (IOException ioe) {
+      throw ioe;
+    } catch (Throwable thr) {
+      throw new IOException(thr);
+    }
+  }
+
+  /**
+   * Mapper for calculation of column-wise mean.
+   */
+  public static class MatrixColumnMeansMapper extends
+      Mapper<Writable, VectorWritable, NullWritable, VectorWritable> {
+
+    private Vector runningSum;
+    private String vectorClass;
+
+    @Override
+    public void setup(Context context) {
+      vectorClass = context.getConfiguration().get(VECTOR_CLASS);
+    }
+
+    /**
+     * The mapper computes a running sum of the vectors the task has seen.
+     * Element 0 of the running sum vector contains a count of the number of
+     * vectors that have been seen. The remaining elements contain the
+     * column-wise running sum. Nothing is written at this stage
+     */
+    @Override
+    public void map(Writable r, VectorWritable v, Context context)
+      throws IOException {
+      if (runningSum == null) {
+          /*
+           * If this is the first vector the mapper has seen, instantiate a new
+           * vector using the parameter VECTOR_CLASS
+           */
+        runningSum = ClassUtils.instantiateAs(vectorClass,
+                                              Vector.class,
+                                              new Class<?>[] { int.class },
+                                              new Object[] { v.get().size() + 
1 });
+        runningSum.set(0, 1);
+        runningSum.viewPart(1, v.get().size()).assign(v.get());
+      } else {
+        runningSum.set(0, runningSum.get(0) + 1);
+        runningSum.viewPart(1, v.get().size()).assign(v.get(), Functions.PLUS);
+      }
+    }
+
+    /**
+     * The column-wise sum is written at the cleanup stage. A single reducer is
+     * forced so null can be used for the key
+     */
+    @Override
+    public void cleanup(Context context) throws InterruptedException,
+      IOException {
+      if (runningSum != null) {
+        context.write(NullWritable.get(), new VectorWritable(runningSum));
+      }
+    }
+
+  }
+
+  /**
+   * The reducer adds the partial column-wise sums from each of the mappers to
+   * compute the total column-wise sum. The total sum is then divided by the
+   * total count of vectors to determine the column-wise mean.
+   */
+  public static class MatrixColumnMeansReducer extends
+      Reducer<NullWritable, VectorWritable, IntWritable, VectorWritable> {
+
+    private static final IntWritable ONE = new IntWritable(1);
+
+    private String vectorClass;
+    private Vector outputVector;
+    private final VectorWritable outputVectorWritable = new VectorWritable();
+
+    @Override
+    public void setup(Context context) {
+      vectorClass = context.getConfiguration().get(VECTOR_CLASS);
+    }
+
+    @Override
+    public void reduce(NullWritable n,
+                       Iterable<VectorWritable> vectors,
+                       Context context) throws IOException, 
InterruptedException {
+
+      /**
+       * Add together partial column-wise sums from mappers
+       */
+      for (VectorWritable v : vectors) {
+        if (outputVector == null) {
+          outputVector = v.get();
+        } else {
+          outputVector.assign(v.get(), Functions.PLUS);
+        }
+      }
+
+      /**
+       * Divide total column-wise sum by count of vectors, which corresponds to
+       * the number of rows in the DistributedRowMatrix
+       */
+      if (outputVector != null) {
+        outputVectorWritable.set(outputVector.viewPart(1,
+                                                       outputVector.size() - 1)
+                                             .divide(outputVector.get(0)));
+        context.write(ONE, outputVectorWritable);
+      } else {
+        Vector emptyVector = ClassUtils.instantiateAs(vectorClass,
+                                                      Vector.class,
+                                                      new Class<?>[] { 
int.class },
+                                                      new Object[] { 0 });
+        context.write(ONE, new VectorWritable(emptyVector));
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/MatrixMultiplicationJob.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/MatrixMultiplicationJob.java
 
b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/MatrixMultiplicationJob.java
new file mode 100644
index 0000000..48eda08
--- /dev/null
+++ 
b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/MatrixMultiplicationJob.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.join.CompositeInputFormat;
+import org.apache.hadoop.mapred.join.TupleWritable;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.SequentialAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.function.Functions;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This still uses the old MR api and as with all things in Mahout that are 
MapReduce is now part of 'mahout-mr'.
+ * There is no plan to convert the old MR api used here to the new MR api.
+ * This will be replaced by the new Spark based Linear Algebra bindings.
+ */
+
+public class MatrixMultiplicationJob extends AbstractJob {
+
+  private static final String OUT_CARD = "output.vector.cardinality";
+
+  public static Configuration createMatrixMultiplyJobConf(Path aPath, 
+                                                          Path bPath, 
+                                                          Path outPath, 
+                                                          int outCardinality) {
+    return createMatrixMultiplyJobConf(new Configuration(), aPath, bPath, 
outPath, outCardinality);
+  }
+  
+  public static Configuration createMatrixMultiplyJobConf(Configuration 
initialConf, 
+                                                          Path aPath, 
+                                                          Path bPath, 
+                                                          Path outPath, 
+                                                          int outCardinality) {
+    JobConf conf = new JobConf(initialConf, MatrixMultiplicationJob.class);
+    conf.setInputFormat(CompositeInputFormat.class);
+    conf.set("mapred.join.expr", CompositeInputFormat.compose(
+          "inner", SequenceFileInputFormat.class, aPath, bPath));
+    conf.setInt(OUT_CARD, outCardinality);
+    conf.setOutputFormat(SequenceFileOutputFormat.class);
+    FileOutputFormat.setOutputPath(conf, outPath);
+    conf.setMapperClass(MatrixMultiplyMapper.class);
+    conf.setCombinerClass(MatrixMultiplicationReducer.class);
+    conf.setReducerClass(MatrixMultiplicationReducer.class);
+    conf.setMapOutputKeyClass(IntWritable.class);
+    conf.setMapOutputValueClass(VectorWritable.class);
+    conf.setOutputKeyClass(IntWritable.class);
+    conf.setOutputValueClass(VectorWritable.class);
+    return conf;
+  }
+
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new MatrixMultiplicationJob(), args);
+  }
+
+  @Override
+  public int run(String[] strings) throws Exception {
+    addOption("numRowsA", "nra", "Number of rows of the first input matrix", 
true);
+    addOption("numColsA", "nca", "Number of columns of the first input 
matrix", true);
+    addOption("numRowsB", "nrb", "Number of rows of the second input matrix", 
true);
+
+    addOption("numColsB", "ncb", "Number of columns of the second input 
matrix", true);
+    addOption("inputPathA", "ia", "Path to the first input matrix", true);
+    addOption("inputPathB", "ib", "Path to the second input matrix", true);
+
+    addOption("outputPath", "op", "Path to the output matrix", false);
+
+    Map<String, List<String>> argMap = parseArguments(strings);
+    if (argMap == null) {
+      return -1;
+    }
+
+    DistributedRowMatrix a = new DistributedRowMatrix(new 
Path(getOption("inputPathA")),
+                                                      new 
Path(getOption("tempDir")),
+                                                      
Integer.parseInt(getOption("numRowsA")),
+                                                      
Integer.parseInt(getOption("numColsA")));
+    DistributedRowMatrix b = new DistributedRowMatrix(new 
Path(getOption("inputPathB")),
+                                                      new 
Path(getOption("tempDir")),
+                                                      
Integer.parseInt(getOption("numRowsB")),
+                                                      
Integer.parseInt(getOption("numColsB")));
+
+    a.setConf(new Configuration(getConf()));
+    b.setConf(new Configuration(getConf()));
+
+    if (hasOption("outputPath")) {
+      a.times(b, new Path(getOption("outputPath")));
+    } else {
+      a.times(b);
+    }
+
+    return 0;
+  }
+
+  public static class MatrixMultiplyMapper extends MapReduceBase
+      implements Mapper<IntWritable,TupleWritable,IntWritable,VectorWritable> {
+
+    private int outCardinality;
+    private final IntWritable row = new IntWritable();
+
+    @Override
+    public void configure(JobConf conf) {
+      outCardinality = conf.getInt(OUT_CARD, Integer.MAX_VALUE);
+    }
+
+    @Override
+    public void map(IntWritable index,
+                    TupleWritable v,
+                    OutputCollector<IntWritable,VectorWritable> out,
+                    Reporter reporter) throws IOException {
+      boolean firstIsOutFrag =  ((VectorWritable)v.get(0)).get().size() == 
outCardinality;
+      Vector outFrag = firstIsOutFrag ? ((VectorWritable)v.get(0)).get() : 
((VectorWritable)v.get(1)).get();
+      Vector multiplier = firstIsOutFrag ? ((VectorWritable)v.get(1)).get() : 
((VectorWritable)v.get(0)).get();
+
+      VectorWritable outVector = new VectorWritable();
+      for (Vector.Element e : multiplier.nonZeroes()) {
+        row.set(e.index());
+        outVector.set(outFrag.times(e.get()));
+        out.collect(row, outVector);
+      }
+    }
+  }
+
+  public static class MatrixMultiplicationReducer extends MapReduceBase
+      implements 
Reducer<IntWritable,VectorWritable,IntWritable,VectorWritable> {
+
+    @Override
+    public void reduce(IntWritable rowNum,
+                       Iterator<VectorWritable> it,
+                       OutputCollector<IntWritable,VectorWritable> out,
+                       Reporter reporter) throws IOException {
+      if (!it.hasNext()) {
+        return;
+      }
+      Vector accumulator = new RandomAccessSparseVector(it.next().get());
+      while (it.hasNext()) {
+        Vector row = it.next().get();
+        accumulator.assign(row, Functions.PLUS);
+      }
+      out.collect(rowNum, new VectorWritable(new 
SequentialAccessSparseVector(accumulator)));
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java
 
b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java
new file mode 100644
index 0000000..e234eb9
--- /dev/null
+++ 
b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java
@@ -0,0 +1,251 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop;
+
+import com.google.common.io.Closeables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.mahout.common.HadoopUtil;
+import 
org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterator;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.function.Functions;
+
+import com.google.common.base.Preconditions;
+
+import java.io.IOException;
+import java.net.URI;
+
+public final class TimesSquaredJob {
+
+  public static final String INPUT_VECTOR = 
"DistributedMatrix.times.inputVector";
+  public static final String IS_SPARSE_OUTPUT = 
"DistributedMatrix.times.outputVector.sparse";
+  public static final String OUTPUT_VECTOR_DIMENSION = 
"DistributedMatrix.times.output.dimension";
+
+  public static final String OUTPUT_VECTOR_FILENAME = 
"DistributedMatrix.times.outputVector";
+
+  private TimesSquaredJob() { }
+
+  public static Job createTimesSquaredJob(Vector v, Path matrixInputPath, Path 
outputVectorPath)
+    throws IOException {
+    return createTimesSquaredJob(new Configuration(), v, matrixInputPath, 
outputVectorPath);
+  }
+  
+  public static Job createTimesSquaredJob(Configuration initialConf, Vector v, 
Path matrixInputPath,
+                                          Path outputVectorPath) throws 
IOException {
+
+    return createTimesSquaredJob(initialConf, v, matrixInputPath, 
outputVectorPath, TimesSquaredMapper.class,
+                                 VectorSummingReducer.class);
+  }
+
+  public static Job createTimesJob(Vector v, int outDim, Path matrixInputPath, 
Path outputVectorPath)
+    throws IOException {
+
+    return createTimesJob(new Configuration(), v, outDim, matrixInputPath, 
outputVectorPath);
+  }
+    
+  public static Job createTimesJob(Configuration initialConf, Vector v, int 
outDim, Path matrixInputPath,
+                                   Path outputVectorPath) throws IOException {
+
+    return createTimesSquaredJob(initialConf, v, outDim, matrixInputPath, 
outputVectorPath, TimesMapper.class,
+                                 VectorSummingReducer.class);
+  }
+
+  public static Job createTimesSquaredJob(Vector v, Path matrixInputPath, Path 
outputVectorPathBase,
+      Class<? extends TimesSquaredMapper> mapClass, Class<? extends 
VectorSummingReducer> redClass) throws IOException {
+
+    return createTimesSquaredJob(new Configuration(), v, matrixInputPath, 
outputVectorPathBase, mapClass, redClass);
+  }
+  
+  public static Job createTimesSquaredJob(Configuration initialConf, Vector v, 
Path matrixInputPath,
+      Path outputVectorPathBase, Class<? extends TimesSquaredMapper> mapClass,
+      Class<? extends VectorSummingReducer> redClass) throws IOException {
+
+    return createTimesSquaredJob(initialConf, v, v.size(), matrixInputPath, 
outputVectorPathBase, mapClass, redClass);
+  }
+
+  public static Job createTimesSquaredJob(Vector v, int outputVectorDim, Path 
matrixInputPath,
+      Path outputVectorPathBase, Class<? extends TimesSquaredMapper> mapClass,
+      Class<? extends VectorSummingReducer> redClass) throws IOException {
+
+    return createTimesSquaredJob(new Configuration(), v, outputVectorDim, 
matrixInputPath, outputVectorPathBase,
+        mapClass, redClass);
+  }
+  
+  public static Job createTimesSquaredJob(Configuration initialConf, Vector v, 
int outputVectorDim,
+      Path matrixInputPath, Path outputVectorPathBase, Class<? extends 
TimesSquaredMapper> mapClass,
+      Class<? extends VectorSummingReducer> redClass) throws IOException {
+
+    FileSystem fs = FileSystem.get(matrixInputPath.toUri(), initialConf);
+    matrixInputPath = fs.makeQualified(matrixInputPath);
+    outputVectorPathBase = fs.makeQualified(outputVectorPathBase);
+
+    long now = System.nanoTime();
+    Path inputVectorPath = new Path(outputVectorPathBase, INPUT_VECTOR + '/' + 
now);
+
+
+    SequenceFile.Writer inputVectorPathWriter = null;
+
+    try {
+      inputVectorPathWriter = new SequenceFile.Writer(fs, initialConf, 
inputVectorPath, NullWritable.class,
+                                                      VectorWritable.class);
+      inputVectorPathWriter.append(NullWritable.get(), new VectorWritable(v));
+    } finally {
+      Closeables.close(inputVectorPathWriter, false);
+    }
+
+    URI ivpURI = inputVectorPath.toUri();
+    DistributedCache.setCacheFiles(new URI[] { ivpURI }, initialConf);
+
+    Job job = HadoopUtil.prepareJob(matrixInputPath, new 
Path(outputVectorPathBase, OUTPUT_VECTOR_FILENAME),
+        SequenceFileInputFormat.class, mapClass, NullWritable.class, 
VectorWritable.class, redClass,
+        NullWritable.class, VectorWritable.class, 
SequenceFileOutputFormat.class, initialConf);
+    job.setCombinerClass(redClass);
+    job.setJobName("TimesSquaredJob: " + matrixInputPath);
+
+    Configuration conf = job.getConfiguration();
+    conf.set(INPUT_VECTOR, ivpURI.toString());
+    conf.setBoolean(IS_SPARSE_OUTPUT, !v.isDense());
+    conf.setInt(OUTPUT_VECTOR_DIMENSION, outputVectorDim);
+
+    return job;
+  }
+
+  public static Vector retrieveTimesSquaredOutputVector(Path 
outputVectorTmpPath, Configuration conf)
+    throws IOException {
+    Path outputFile = new Path(outputVectorTmpPath, OUTPUT_VECTOR_FILENAME + 
"/part-r-00000");
+    SequenceFileValueIterator<VectorWritable> iterator =
+        new SequenceFileValueIterator<>(outputFile, true, conf);
+    try {
+      return iterator.next().get();
+    } finally {
+      Closeables.close(iterator, true);
+    }
+  }
+
+  public static class TimesSquaredMapper<T extends WritableComparable>
+      extends Mapper<T,VectorWritable, NullWritable,VectorWritable> {
+
+    private Vector outputVector;
+    private Vector inputVector;
+
+    Vector getOutputVector() {
+      return outputVector;
+    }
+
+    @Override
+    protected void setup(Context ctx) throws IOException, InterruptedException 
{
+      try {
+        Configuration conf = ctx.getConfiguration();
+        Path[] localFiles = DistributedCache.getLocalCacheFiles(conf);
+        Preconditions.checkArgument(localFiles != null && localFiles.length >= 
1,
+            "missing paths from the DistributedCache");
+
+        Path inputVectorPath = HadoopUtil.getSingleCachedFile(conf);
+
+        SequenceFileValueIterator<VectorWritable> iterator =
+            new SequenceFileValueIterator<>(inputVectorPath, true, conf);
+        try {
+          inputVector = iterator.next().get();
+        } finally {
+          Closeables.close(iterator, true);
+        }
+
+        int outDim = conf.getInt(OUTPUT_VECTOR_DIMENSION, Integer.MAX_VALUE);
+        outputVector = conf.getBoolean(IS_SPARSE_OUTPUT, false)
+            ? new RandomAccessSparseVector(outDim, 10)
+            : new DenseVector(outDim);
+      } catch (IOException ioe) {
+        throw new IllegalStateException(ioe);
+      }
+    }
+
+    @Override
+    protected void map(T key, VectorWritable v, Context context) throws 
IOException, InterruptedException {
+
+      double d = scale(v);
+      if (d == 1.0) {
+        outputVector.assign(v.get(), Functions.PLUS);
+      } else if (d != 0.0) {
+        outputVector.assign(v.get(), Functions.plusMult(d));
+      }
+    }
+
+    protected double scale(VectorWritable v) {
+      return v.get().dot(inputVector);
+    }
+
+    @Override
+    protected void cleanup(Context ctx) throws IOException, 
InterruptedException {
+      ctx.write(NullWritable.get(), new VectorWritable(outputVector));
+    }
+
+  }
+
+  public static class TimesMapper extends TimesSquaredMapper<IntWritable> {
+
+
+    @Override
+    protected void map(IntWritable rowNum, VectorWritable v, Context context) 
throws IOException, InterruptedException {
+      double d = scale(v);
+      if (d != 0.0) {
+        getOutputVector().setQuick(rowNum.get(), d);
+      }
+    }
+  }
+
+  public static class VectorSummingReducer extends 
Reducer<NullWritable,VectorWritable,NullWritable,VectorWritable> {
+
+    private Vector outputVector;
+
+    @Override
+    protected void setup(Context ctx) throws IOException, InterruptedException 
{
+      Configuration conf = ctx.getConfiguration();
+      int outputDimension = conf.getInt(OUTPUT_VECTOR_DIMENSION, 
Integer.MAX_VALUE);
+      outputVector = conf.getBoolean(IS_SPARSE_OUTPUT, false)
+                   ? new RandomAccessSparseVector(outputDimension, 10)
+                   : new DenseVector(outputDimension);
+    }
+
+    @Override
+    protected void reduce(NullWritable key, Iterable<VectorWritable> vectors, 
Context ctx)
+      throws IOException, InterruptedException {
+
+      for (VectorWritable v : vectors) {
+        if (v != null) {
+          outputVector.assign(v.get(), Functions.PLUS);
+        }
+      }
+      ctx.write(NullWritable.get(), new VectorWritable(outputVector));
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/TransposeJob.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/TransposeJob.java
 
b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/TransposeJob.java
new file mode 100644
index 0000000..60066c6
--- /dev/null
+++ 
b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/TransposeJob.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.mapreduce.MergeVectorsCombiner;
+import org.apache.mahout.common.mapreduce.MergeVectorsReducer;
+import org.apache.mahout.common.mapreduce.TransposeMapper;
+import org.apache.mahout.math.VectorWritable;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/** Transpose a matrix */
+public class TransposeJob extends AbstractJob {
+
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new TransposeJob(), args);
+  }
+
+  @Override
+  public int run(String[] strings) throws Exception {
+    addInputOption();
+    addOption("numRows", "nr", "Number of rows of the input matrix");
+    addOption("numCols", "nc", "Number of columns of the input matrix");
+    Map<String, List<String>> parsedArgs = parseArguments(strings);
+    if (parsedArgs == null) {
+      return -1;
+    }
+
+    int numRows = Integer.parseInt(getOption("numRows"));
+    int numCols = Integer.parseInt(getOption("numCols"));
+
+    DistributedRowMatrix matrix = new DistributedRowMatrix(getInputPath(), 
getTempPath(), numRows, numCols);
+    matrix.setConf(new Configuration(getConf()));
+    matrix.transpose();
+
+    return 0;
+  }
+
+  public static Job buildTransposeJob(Path matrixInputPath, Path 
matrixOutputPath, int numInputRows)
+    throws IOException {
+    return buildTransposeJob(new Configuration(), matrixInputPath, 
matrixOutputPath, numInputRows);
+  }
+
+  public static Job buildTransposeJob(Configuration initialConf, Path 
matrixInputPath, Path matrixOutputPath,
+      int numInputRows) throws IOException {
+
+    Job job = HadoopUtil.prepareJob(matrixInputPath, matrixOutputPath, 
SequenceFileInputFormat.class,
+        TransposeMapper.class, IntWritable.class, VectorWritable.class, 
MergeVectorsReducer.class, IntWritable.class,
+        VectorWritable.class, SequenceFileOutputFormat.class, initialConf);
+    job.setCombinerClass(MergeVectorsCombiner.class);
+    job.getConfiguration().setInt(TransposeMapper.NEW_NUM_COLS_PARAM, 
numInputRows);
+
+    job.setJobName("TransposeJob: " + matrixInputPath);
+
+    return job;
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/DistributedLanczosSolver.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/DistributedLanczosSolver.java
 
b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/DistributedLanczosSolver.java
new file mode 100644
index 0000000..86c0f6b
--- /dev/null
+++ 
b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/DistributedLanczosSolver.java
@@ -0,0 +1,299 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.decomposer;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.io.Closeables;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.NamedVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorIterable;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.decomposer.lanczos.LanczosSolver;
+import org.apache.mahout.math.decomposer.lanczos.LanczosState;
+import org.apache.mahout.math.hadoop.DistributedRowMatrix;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * See the SSVD code for a better option than using this:
+ *
+ * http://mahout.apache.org/users/dim-reduction/ssvd.html
+ * @see org.apache.mahout.math.hadoop.stochasticsvd.SSVDSolver
+ */
+@Deprecated
+public class DistributedLanczosSolver extends LanczosSolver implements Tool {
+
+  public static final String RAW_EIGENVECTORS = "rawEigenvectors";
+
+  private static final Logger log = 
LoggerFactory.getLogger(DistributedLanczosSolver.class);
+
+  private Configuration conf;
+
+  private Map<String, List<String>> parsedArgs;
+
+  /**
+   * For the distributed case, the best guess at a useful initialization state 
for Lanczos we'll chose to be
+   * uniform over all input dimensions, L_2 normalized.
+   */
+  public static Vector getInitialVector(VectorIterable corpus) {
+    Vector initialVector = new DenseVector(corpus.numCols());
+    initialVector.assign(1.0 / Math.sqrt(corpus.numCols()));
+    return initialVector;
+  }
+
+  public LanczosState runJob(Configuration originalConfig,
+                             LanczosState state,
+                             int desiredRank,
+                             boolean isSymmetric,
+                             String outputEigenVectorPathString) throws 
IOException {
+    ((Configurable) state.getCorpus()).setConf(new 
Configuration(originalConfig));
+    setConf(originalConfig);
+    solve(state, desiredRank, isSymmetric);
+    serializeOutput(state, new Path(outputEigenVectorPathString));
+    return state;
+  }
+
+  /**
+   * Factored-out LanczosSolver for the purpose of invoking it programmatically
+   */
+  public LanczosState runJob(Configuration originalConfig,
+                             Path inputPath,
+                             Path outputTmpPath,
+                             int numRows,
+                             int numCols,
+                             boolean isSymmetric,
+                             int desiredRank,
+                             String outputEigenVectorPathString) throws 
IOException {
+    DistributedRowMatrix matrix = new DistributedRowMatrix(inputPath, 
outputTmpPath, numRows, numCols);
+    matrix.setConf(new Configuration(originalConfig));
+    LanczosState state = new LanczosState(matrix, desiredRank, 
getInitialVector(matrix));
+    return runJob(originalConfig, state, desiredRank, isSymmetric, 
outputEigenVectorPathString);
+  }
+
+  @Override
+  public int run(String[] strings) throws Exception {
+    Path inputPath = new Path(AbstractJob.getOption(parsedArgs, "--input"));
+    Path outputPath = new Path(AbstractJob.getOption(parsedArgs, "--output"));
+    Path outputTmpPath = new Path(AbstractJob.getOption(parsedArgs, 
"--tempDir"));
+    Path workingDirPath = AbstractJob.getOption(parsedArgs, "--workingDir") != 
null
+                        ? new Path(AbstractJob.getOption(parsedArgs, 
"--workingDir")) : null;
+    int numRows = Integer.parseInt(AbstractJob.getOption(parsedArgs, 
"--numRows"));
+    int numCols = Integer.parseInt(AbstractJob.getOption(parsedArgs, 
"--numCols"));
+    boolean isSymmetric = 
Boolean.parseBoolean(AbstractJob.getOption(parsedArgs, "--symmetric"));
+    int desiredRank = Integer.parseInt(AbstractJob.getOption(parsedArgs, 
"--rank"));
+
+    boolean cleansvd = Boolean.parseBoolean(AbstractJob.getOption(parsedArgs, 
"--cleansvd"));
+    if (cleansvd) {
+      double maxError = Double.parseDouble(AbstractJob.getOption(parsedArgs, 
"--maxError"));
+      double minEigenvalue = 
Double.parseDouble(AbstractJob.getOption(parsedArgs, "--minEigenvalue"));
+      boolean inMemory = 
Boolean.parseBoolean(AbstractJob.getOption(parsedArgs, "--inMemory"));
+      return run(inputPath,
+                 outputPath,
+                 outputTmpPath,
+                 workingDirPath,
+                 numRows,
+                 numCols,
+                 isSymmetric,
+                 desiredRank,
+                 maxError,
+                 minEigenvalue,
+                 inMemory);
+    }
+    return run(inputPath, outputPath, outputTmpPath, workingDirPath, numRows, 
numCols, isSymmetric, desiredRank);
+  }
+
+  /**
+   * Run the solver to produce raw eigenvectors, then run the 
EigenVerificationJob to clean them
+   * 
+   * @param inputPath the Path to the input corpus
+   * @param outputPath the Path to the output
+   * @param outputTmpPath a Path to a temporary working directory
+   * @param numRows the int number of rows 
+   * @param numCols the int number of columns
+   * @param isSymmetric true if the input matrix is symmetric
+   * @param desiredRank the int desired rank of eigenvectors to produce
+   * @param maxError the maximum allowable error
+   * @param minEigenvalue the minimum usable eigenvalue
+   * @param inMemory true if the verification can be done in memory
+   * @return an int indicating success (0) or otherwise
+   */
+  public int run(Path inputPath,
+                 Path outputPath,
+                 Path outputTmpPath,
+                 Path workingDirPath,
+                 int numRows,
+                 int numCols,
+                 boolean isSymmetric,
+                 int desiredRank,
+                 double maxError,
+                 double minEigenvalue,
+                 boolean inMemory) throws Exception {
+    int result = run(inputPath, outputPath, outputTmpPath, workingDirPath, 
numRows, numCols,
+        isSymmetric, desiredRank);
+    if (result != 0) {
+      return result;
+    }
+    Path rawEigenVectorPath = new Path(outputPath, RAW_EIGENVECTORS);
+    return new EigenVerificationJob().run(inputPath,
+                                          rawEigenVectorPath,
+                                          outputPath,
+                                          outputTmpPath,
+                                          maxError,
+                                          minEigenvalue,
+                                          inMemory,
+                                          getConf() != null ? new 
Configuration(getConf()) : new Configuration());
+  }
+
+  /**
+   * Run the solver to produce the raw eigenvectors
+   * 
+   * @param inputPath the Path to the input corpus
+   * @param outputPath the Path to the output
+   * @param outputTmpPath a Path to a temporary working directory
+   * @param numRows the int number of rows 
+   * @param numCols the int number of columns
+   * @param isSymmetric true if the input matrix is symmetric
+   * @param desiredRank the int desired rank of eigenvectors to produce
+   * @return  an int indicating success (0) or otherwise
+   */
+  public int run(Path inputPath,
+                 Path outputPath,
+                 Path outputTmpPath,
+                 Path workingDirPath,
+                 int numRows,
+                 int numCols,
+                 boolean isSymmetric,
+                 int desiredRank) throws Exception {
+    DistributedRowMatrix matrix = new DistributedRowMatrix(inputPath, 
outputTmpPath, numRows, numCols);
+    matrix.setConf(new Configuration(getConf() != null ? getConf() : new 
Configuration()));
+
+    LanczosState state;
+    if (workingDirPath == null) {
+      state = new LanczosState(matrix, desiredRank, getInitialVector(matrix));
+    } else {
+      HdfsBackedLanczosState hState =
+          new HdfsBackedLanczosState(matrix, desiredRank, 
getInitialVector(matrix), workingDirPath);
+      hState.setConf(matrix.getConf());
+      state = hState;
+    }
+    solve(state, desiredRank, isSymmetric);
+
+    Path outputEigenVectorPath = new Path(outputPath, RAW_EIGENVECTORS);
+    serializeOutput(state, outputEigenVectorPath);
+    return 0;
+  }
+
+  /**
+   * @param state The final LanczosState to be serialized
+   * @param outputPath The path (relative to the current Configuration's 
FileSystem) to save the output to.
+   */
+  public void serializeOutput(LanczosState state, Path outputPath) throws 
IOException {
+    int numEigenVectors = state.getIterationNumber();
+    log.info("Persisting {} eigenVectors and eigenValues to: {}", 
numEigenVectors, outputPath); 
+    Configuration conf = getConf() != null ? getConf() : new Configuration();
+    FileSystem fs = FileSystem.get(outputPath.toUri(), conf);
+    SequenceFile.Writer seqWriter =
+        new SequenceFile.Writer(fs, conf, outputPath, IntWritable.class, 
VectorWritable.class);
+    try {
+      IntWritable iw = new IntWritable();
+      for (int i = 0; i < numEigenVectors; i++) {
+        // Persist eigenvectors sorted by eigenvalues in descending order\
+        NamedVector v = new 
NamedVector(state.getRightSingularVector(numEigenVectors - 1 - i),
+            "eigenVector" + i + ", eigenvalue = " + 
state.getSingularValue(numEigenVectors - 1 - i));
+        Writable vw = new VectorWritable(v);
+        iw.set(i);
+        seqWriter.append(iw, vw);
+      }
+    } finally {
+      Closeables.close(seqWriter, false);
+    }
+  }
+
+  @Override
+  public void setConf(Configuration configuration) {
+    conf = configuration;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  public DistributedLanczosSolverJob job() {
+    return new DistributedLanczosSolverJob();
+  }
+
+  /**
+   * Inner subclass of AbstractJob so we get access to AbstractJob's 
functionality w.r.t. cmdline options, but still
+   * sublcass LanczosSolver.
+   */
+  public class DistributedLanczosSolverJob extends AbstractJob {
+    @Override
+    public void setConf(Configuration conf) {
+      DistributedLanczosSolver.this.setConf(conf);
+    }
+
+    @Override
+    public Configuration getConf() {
+      return DistributedLanczosSolver.this.getConf();
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+      addInputOption();
+      addOutputOption();
+      addOption("numRows", "nr", "Number of rows of the input matrix");
+      addOption("numCols", "nc", "Number of columns of the input matrix");
+      addOption("rank", "r", "Desired decomposition rank (note: only roughly 
1/4 to 1/3 "
+          + "of these will have the top portion of the spectrum)");
+      addOption("symmetric", "sym", "Is the input matrix square and 
symmetric?");
+      addOption("workingDir", "wd", "Working directory path to store Lanczos 
basis vectors "
+                                    + "(to be used on restarts, and to avoid 
too much RAM usage)");
+      // options required to run cleansvd job
+      addOption("cleansvd", "cl", "Run the EigenVerificationJob to clean the 
eigenvectors after SVD", false);
+      addOption("maxError", "err", "Maximum acceptable error", "0.05");
+      addOption("minEigenvalue", "mev", "Minimum eigenvalue to keep the vector 
for", "0.0");
+      addOption("inMemory", "mem", "Buffer eigen matrix into memory (if you 
have enough!)", "false");
+
+      DistributedLanczosSolver.this.parsedArgs = parseArguments(args);
+      if (DistributedLanczosSolver.this.parsedArgs == null) {
+        return -1;
+      } else {
+        return DistributedLanczosSolver.this.run(args);
+      }
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new DistributedLanczosSolver().job(), args);
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVector.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVector.java
 
b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVector.java
new file mode 100644
index 0000000..d2f0c8c
--- /dev/null
+++ 
b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVector.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.decomposer;
+
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.NamedVector;
+import org.apache.mahout.math.Vector;
+
+import java.util.regex.Pattern;
+
+/**
+ * TODO this is a horrible hack.  Make a proper writable subclass also.
+ */
+public class EigenVector extends NamedVector {
+
+  private static final Pattern EQUAL_PATTERN = Pattern.compile(" = ");
+  private static final Pattern PIPE_PATTERN = Pattern.compile("\\|");
+
+  public EigenVector(Vector v, double eigenValue, double cosAngleError, int 
order) {
+    super(v instanceof DenseVector ? (DenseVector) v : new DenseVector(v),
+        "e|" + order + "| = |" + eigenValue + "|, err = " + cosAngleError);
+  }
+
+  public double getEigenValue() {
+    return getEigenValue(getName());
+  }
+
+  public double getCosAngleError() {
+    return getCosAngleError(getName());
+  }
+
+  public int getIndex() {
+    return getIndex(getName());
+  }
+
+  public static double getEigenValue(CharSequence name) {
+    return parseMetaData(name)[1];
+  }
+
+  public static double getCosAngleError(CharSequence name) {
+    return parseMetaData(name)[2];
+  }
+
+  public static int getIndex(CharSequence name) {
+    return (int)parseMetaData(name)[0];
+  }
+
+  public static double[] parseMetaData(CharSequence name) {
+    double[] m = new double[3];
+    String[] s = EQUAL_PATTERN.split(name);
+    m[0] = Double.parseDouble(PIPE_PATTERN.split(s[0])[1]);
+    m[1] = Double.parseDouble(PIPE_PATTERN.split(s[1])[1]);
+    m[2] = Double.parseDouble(s[2].substring(1));
+    return m;
+  }
+
+  protected double[] parseMetaData() {
+    return parseMetaData(getName());
+  }
+
+}

Reply via email to