http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/Omega.java
----------------------------------------------------------------------
diff --git 
a/mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/Omega.java 
b/mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/Omega.java
new file mode 100644
index 0000000..a5f32ad
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/Omega.java
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.stochasticsvd;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.Lists;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.Vector.Element;
+
+/**
+ * simplistic implementation for Omega matrix in Stochastic SVD method
+ */
+public class Omega {
+
+  private static final double UNIFORM_DIVISOR = Math.pow(2.0, 64);
+
+  private final long seed;
+  private final int kp;
+
+  public Omega(long seed, int kp) {
+    this.seed = seed;
+    this.kp = kp;
+  }
+
+  /**
+   * Get omega element at (x,y) uniformly distributed within [-1...1)
+   *
+   * @param row
+   *          omega row
+   * @param column
+   *          omega column
+   */
+  public double getQuick(int row, int column) {
+    long hash = murmur64((long) row << Integer.SIZE | column, 8, seed);
+    return hash / UNIFORM_DIVISOR;
+  }
+
+  /**
+   * compute YRow=ARow*Omega.
+   * 
+   * @param aRow
+   *          row of matrix A (size n)
+   * @param yRow
+   *          row of matrix Y (result) must be pre-allocated to size of (k+p)
+   */
+  @Deprecated
+  public void computeYRow(Vector aRow, double[] yRow) {
+    // assert yRow.length == kp;
+    Arrays.fill(yRow, 0.0);
+    if (aRow.isDense()) {
+      int n = aRow.size();
+      for (int j = 0; j < n; j++) {
+        accumDots(j, aRow.getQuick(j), yRow);
+      }
+    } else {
+      for (Element el : aRow.nonZeroes()) {
+        accumDots(el.index(), el.get(), yRow);
+      }
+    }
+  }
+
+  /**
+   * A version to compute yRow as a sparse vector in case of extremely sparse
+   * matrices
+   * 
+   * @param aRow
+   * @param yRowOut
+   */
+  public void computeYRow(Vector aRow, Vector yRowOut) {
+    yRowOut.assign(0.0);
+    if (aRow.isDense()) {
+      int n = aRow.size();
+      for (int j = 0; j < n; j++) {
+        accumDots(j, aRow.getQuick(j), yRowOut);
+      }
+    } else {
+      for (Element el : aRow.nonZeroes()) {
+        accumDots(el.index(), el.get(), yRowOut);
+      }
+    }
+  }
+
+  /*
+   * computes t(Omega) %*% v in multithreaded fashion
+   */
+  public Vector mutlithreadedTRightMultiply(final Vector v) {
+
+    int nThreads = Runtime.getRuntime().availableProcessors();
+    ExecutorService es =
+      new ThreadPoolExecutor(nThreads,
+                             nThreads,
+                             1,
+                             TimeUnit.SECONDS,
+                             new ArrayBlockingQueue<Runnable>(kp));
+
+    try {
+
+      List<Future<Double>> dotFutures = Lists.newArrayListWithCapacity(kp);
+
+      for (int i = 0; i < kp; i++) {
+        final int index = i;
+
+        Future<Double> dotFuture = es.submit(new Callable<Double>() {
+          @Override
+          public Double call() throws Exception {
+            double result = 0.0;
+            if (v.isDense()) {
+              for (int k = 0; k < v.size(); k++) {
+                // it's ok, this is reentrant
+                result += getQuick(k, index) * v.getQuick(k);
+              }
+
+            } else {
+              for (Element el : v.nonZeroes()) {
+                int k = el.index();
+                result += getQuick(k, index) * el.get();
+              }
+            }
+            return result;
+          }
+        });
+        dotFutures.add(dotFuture);
+      }
+
+      try {
+        Vector res = new DenseVector(kp);
+        for (int i = 0; i < kp; i++) {
+          res.setQuick(i, dotFutures.get(i).get());
+        }
+        return res;
+      } catch (InterruptedException exc) {
+        throw new IllegalStateException("Interrupted", exc);
+      } catch (ExecutionException exc) {
+        if (exc.getCause() instanceof RuntimeException) {
+          throw (RuntimeException) exc.getCause();
+        } else {
+          throw new IllegalStateException(exc.getCause());
+        }
+      }
+
+    } finally {
+      es.shutdown();
+    }
+  }
+
+  protected void accumDots(int aIndex, double aElement, double[] yRow) {
+    for (int i = 0; i < kp; i++) {
+      yRow[i] += getQuick(aIndex, i) * aElement;
+    }
+  }
+
+  protected void accumDots(int aIndex, double aElement, Vector yRow) {
+    for (int i = 0; i < kp; i++) {
+      yRow.setQuick(i, yRow.getQuick(i) + getQuick(aIndex, i) * aElement);
+    }
+  }
+
+  /**
+   * Shortened version for data < 8 bytes packed into {@code len} lowest bytes
+   * of {@code val}.
+   * 
+   * @param val
+   *          the value
+   * @param len
+   *          the length of data packed into this many low bytes of {@code val}
+   * @param seed
+   *          the seed to use
+   * @return murmur hash
+   */
+  public static long murmur64(long val, int len, long seed) {
+
+    // assert len > 0 && len <= 8;
+    long m = 0xc6a4a7935bd1e995L;
+    long h = seed ^ len * m;
+
+    long k = val;
+
+    k *= m;
+    int r = 47;
+    k ^= k >>> r;
+    k *= m;
+
+    h ^= k;
+    h *= m;
+
+    h ^= h >>> r;
+    h *= m;
+    h ^= h >>> r;
+    return h;
+  }
+
+  public static long murmur64(byte[] val, int offset, int len, long seed) {
+
+    long m = 0xc6a4a7935bd1e995L;
+    int r = 47;
+    long h = seed ^ (len * m);
+
+    int lt = len >>> 3;
+    for (int i = 0; i < lt; i++, offset += 8) {
+      long k = 0;
+      for (int j = 0; j < 8; j++) {
+        k <<= 8;
+        k |= val[offset + j] & 0xff;
+      }
+
+      k *= m;
+      k ^= k >>> r;
+      k *= m;
+
+      h ^= k;
+      h *= m;
+    }
+
+    if (offset < len) {
+      long k = 0;
+      while (offset < len) {
+        k <<= 8;
+        k |= val[offset] & 0xff;
+        offset++;
+      }
+      h ^= k;
+      h *= m;
+    }
+
+    h ^= h >>> r;
+    h *= m;
+    h ^= h >>> r;
+    return h;
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/QJob.java
----------------------------------------------------------------------
diff --git 
a/mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/QJob.java 
b/mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/QJob.java
new file mode 100644
index 0000000..76dc299
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/QJob.java
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.stochasticsvd;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Deque;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.lib.MultipleOutputs;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.mahout.common.IOUtils;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.function.Functions;
+import org.apache.mahout.math.hadoop.stochasticsvd.qr.QRFirstStep;
+
+/**
+ * Compute first level of QHat-transpose blocks.
+ * <P>
+ * 
+ * See Mahout-376 working notes for details.
+ * <P>
+ * 
+ * Uses some of Hadoop deprecated api wherever newer api is not available.
+ * Hence, @SuppressWarnings("deprecation") for imports (MAHOUT-593).
+ * <P>
+ * 
+ */
+@SuppressWarnings("deprecation")
+public final class QJob {
+
+  public static final String PROP_OMEGA_SEED = "ssvd.omegaseed";
+  public static final String PROP_K = QRFirstStep.PROP_K;
+  public static final String PROP_P = QRFirstStep.PROP_P;
+  public static final String PROP_SB_PATH = "ssvdpca.sb.path";
+  public static final String PROP_AROWBLOCK_SIZE =
+    QRFirstStep.PROP_AROWBLOCK_SIZE;
+
+  public static final String OUTPUT_RHAT = "R";
+  public static final String OUTPUT_QHAT = "QHat";
+
+  private QJob() {
+  }
+
+  public static class QMapper
+      extends
+      Mapper<Writable, VectorWritable, SplitPartitionedWritable, 
VectorWritable> {
+
+    private MultipleOutputs outputs;
+    private final Deque<Closeable> closeables = Lists.newLinkedList();
+    private SplitPartitionedWritable qHatKey;
+    private SplitPartitionedWritable rHatKey;
+    private Vector yRow;
+    private Vector sb;
+    private Omega omega;
+    private int kp;
+
+    private QRFirstStep qr;
+
+    @Override
+    protected void setup(Context context) throws IOException,
+      InterruptedException {
+
+      Configuration conf = context.getConfiguration();
+      int k = Integer.parseInt(conf.get(PROP_K));
+      int p = Integer.parseInt(conf.get(PROP_P));
+      kp = k + p;
+      long omegaSeed = Long.parseLong(conf.get(PROP_OMEGA_SEED));
+      omega = new Omega(omegaSeed, k + p);
+
+      String sbPathStr = conf.get(PROP_SB_PATH);
+      if (sbPathStr != null) {
+        sb = SSVDHelper.loadAndSumUpVectors(new Path(sbPathStr), conf);
+        if (sb == null)
+          throw new IOException(String.format("Unable to load s_omega from 
path %s.", sbPathStr));
+      }
+
+      outputs = new MultipleOutputs(new JobConf(conf));
+      closeables.addFirst(new Closeable() {
+        @Override
+        public void close() throws IOException {
+          outputs.close();
+        }
+      });
+
+      qHatKey = new SplitPartitionedWritable(context);
+      rHatKey = new SplitPartitionedWritable(context);
+
+      OutputCollector<Writable, DenseBlockWritable> qhatCollector =
+        new OutputCollector<Writable, DenseBlockWritable>() {
+
+          @Override
+          @SuppressWarnings("unchecked")
+          public void collect(Writable nil, DenseBlockWritable dbw)
+            throws IOException {
+            outputs.getCollector(OUTPUT_QHAT, null).collect(qHatKey, dbw);
+            qHatKey.incrementItemOrdinal();
+          }
+        };
+
+      OutputCollector<Writable, VectorWritable> rhatCollector =
+        new OutputCollector<Writable, VectorWritable>() {
+
+          @Override
+          @SuppressWarnings("unchecked")
+          public void collect(Writable nil, VectorWritable rhat)
+            throws IOException {
+            outputs.getCollector(OUTPUT_RHAT, null).collect(rHatKey, rhat);
+            rHatKey.incrementItemOrdinal();
+          }
+        };
+
+      qr = new QRFirstStep(conf, qhatCollector, rhatCollector);
+      closeables.addFirst(qr); // important: qr closes first!!
+      yRow = new DenseVector(kp);
+    }
+
+    @Override
+    protected void map(Writable key, VectorWritable value, Context context)
+      throws IOException, InterruptedException {
+      omega.computeYRow(value.get(), yRow);
+      if (sb != null) {
+        yRow.assign(sb, Functions.MINUS);
+      }
+      qr.collect(key, yRow);
+    }
+
+    @Override
+    protected void cleanup(Context context) throws IOException,
+      InterruptedException {
+      IOUtils.close(closeables);
+    }
+  }
+
+  public static void run(Configuration conf,
+                         Path[] inputPaths,
+                         Path sbPath,
+                         Path outputPath,
+                         int aBlockRows,
+                         int minSplitSize,
+                         int k,
+                         int p,
+                         long seed,
+                         int numReduceTasks) throws ClassNotFoundException,
+    InterruptedException, IOException {
+
+    JobConf oldApiJob = new JobConf(conf);
+    MultipleOutputs.addNamedOutput(oldApiJob,
+                                   OUTPUT_QHAT,
+                                   
org.apache.hadoop.mapred.SequenceFileOutputFormat.class,
+                                   SplitPartitionedWritable.class,
+                                   DenseBlockWritable.class);
+    MultipleOutputs.addNamedOutput(oldApiJob,
+                                   OUTPUT_RHAT,
+                                   
org.apache.hadoop.mapred.SequenceFileOutputFormat.class,
+                                   SplitPartitionedWritable.class,
+                                   VectorWritable.class);
+
+    Job job = new Job(oldApiJob);
+    job.setJobName("Q-job");
+    job.setJarByClass(QJob.class);
+
+    job.setInputFormatClass(SequenceFileInputFormat.class);
+    FileInputFormat.setInputPaths(job, inputPaths);
+    if (minSplitSize > 0) {
+      FileInputFormat.setMinInputSplitSize(job, minSplitSize);
+    }
+
+    FileOutputFormat.setOutputPath(job, outputPath);
+
+    FileOutputFormat.setCompressOutput(job, true);
+    FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);
+    SequenceFileOutputFormat.setOutputCompressionType(job,
+                                                      CompressionType.BLOCK);
+
+    job.setMapOutputKeyClass(SplitPartitionedWritable.class);
+    job.setMapOutputValueClass(VectorWritable.class);
+
+    job.setOutputKeyClass(SplitPartitionedWritable.class);
+    job.setOutputValueClass(VectorWritable.class);
+
+    job.setMapperClass(QMapper.class);
+
+    job.getConfiguration().setInt(PROP_AROWBLOCK_SIZE, aBlockRows);
+    job.getConfiguration().setLong(PROP_OMEGA_SEED, seed);
+    job.getConfiguration().setInt(PROP_K, k);
+    job.getConfiguration().setInt(PROP_P, p);
+    if (sbPath != null) {
+      job.getConfiguration().set(PROP_SB_PATH, sbPath.toString());
+    }
+
+    /*
+     * number of reduce tasks doesn't matter. we don't actually send anything 
to
+     * reducers.
+     */
+
+    job.setNumReduceTasks(0 /* numReduceTasks */);
+
+    job.submit();
+    job.waitForCompletion(false);
+
+    if (!job.isSuccessful()) {
+      throw new IOException("Q job unsuccessful.");
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDCli.java
----------------------------------------------------------------------
diff --git 
a/mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDCli.java 
b/mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDCli.java
new file mode 100644
index 0000000..7b4fefb
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDCli.java
@@ -0,0 +1,201 @@
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.stochasticsvd;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.hadoop.MatrixColumnMeansJob;
+
+/**
+ * Mahout CLI adapter for SSVDSolver
+ */
+public class SSVDCli extends AbstractJob {
+
+  @Override
+  public int run(String[] args) throws Exception {
+    addInputOption();
+    addOutputOption();
+    addOption("rank", "k", "decomposition rank", true);
+    addOption("oversampling", "p", "oversampling", String.valueOf(15));
+    addOption("blockHeight",
+              "r",
+              "Y block height (must be > (k+p))",
+              String.valueOf(10000));
+    addOption("outerProdBlockHeight",
+              "oh",
+              "block height of outer products during multiplication, increase 
for sparse inputs",
+              String.valueOf(30000));
+    addOption("abtBlockHeight",
+              "abth",
+              "block height of Y_i in ABtJob during AB' multiplication, 
increase for extremely sparse inputs",
+              String.valueOf(200000));
+    addOption("minSplitSize", "s", "minimum split size", String.valueOf(-1));
+    addOption("computeU", "U", "compute U (true/false)", String.valueOf(true));
+    addOption("uHalfSigma",
+              "uhs",
+              "Compute U * Sigma^0.5",
+              String.valueOf(false));
+    addOption("uSigma", "us", "Compute U * Sigma", String.valueOf(false));
+    addOption("computeV", "V", "compute V (true/false)", String.valueOf(true));
+    addOption("vHalfSigma",
+              "vhs",
+              "compute V * Sigma^0.5",
+              String.valueOf(false));
+    addOption("reduceTasks",
+              "t",
+              "number of reduce tasks (where applicable)",
+              true);
+    addOption("powerIter",
+              "q",
+              "number of additional power iterations (0..2 is good)",
+              String.valueOf(0));
+    addOption("broadcast",
+              "br",
+              "whether use distributed cache to broadcast matrices wherever 
possible",
+              String.valueOf(true));
+    addOption("pca",
+              "pca",
+              "run in pca mode: compute column-wise mean and subtract from 
input",
+              String.valueOf(false));
+    addOption("pcaOffset",
+              "xi",
+              "path(glob) of external pca mean (optional, dont compute, use 
external mean");
+    addOption(DefaultOptionCreator.overwriteOption().create());
+
+    Map<String, List<String>> pargs = parseArguments(args);
+    if (pargs == null) {
+      return -1;
+    }
+
+    int k = Integer.parseInt(getOption("rank"));
+    int p = Integer.parseInt(getOption("oversampling"));
+    int r = Integer.parseInt(getOption("blockHeight"));
+    int h = Integer.parseInt(getOption("outerProdBlockHeight"));
+    int abh = Integer.parseInt(getOption("abtBlockHeight"));
+    int q = Integer.parseInt(getOption("powerIter"));
+    int minSplitSize = Integer.parseInt(getOption("minSplitSize"));
+    boolean computeU = Boolean.parseBoolean(getOption("computeU"));
+    boolean computeV = Boolean.parseBoolean(getOption("computeV"));
+    boolean cUHalfSigma = Boolean.parseBoolean(getOption("uHalfSigma"));
+    boolean cUSigma = Boolean.parseBoolean(getOption("uSigma"));
+    boolean cVHalfSigma = Boolean.parseBoolean(getOption("vHalfSigma"));
+    int reduceTasks = Integer.parseInt(getOption("reduceTasks"));
+    boolean broadcast = Boolean.parseBoolean(getOption("broadcast"));
+    String xiPathStr = getOption("pcaOffset");
+    Path xiPath = xiPathStr == null ? null : new Path(xiPathStr);
+    boolean pca = Boolean.parseBoolean(getOption("pca")) || xiPath != null;
+
+    boolean overwrite = hasOption(DefaultOptionCreator.OVERWRITE_OPTION);
+
+    Configuration conf = getConf();
+    if (conf == null) {
+      throw new IOException("No Hadoop configuration present");
+    }
+
+    Path[] inputPaths = { getInputPath() };
+    Path tempPath = getTempPath();
+    FileSystem fs = FileSystem.get(getTempPath().toUri(), conf);
+
+    // housekeeping
+    if (overwrite) {
+      // clear the output path
+      HadoopUtil.delete(getConf(), getOutputPath());
+      // clear the temp path
+      HadoopUtil.delete(getConf(), getTempPath());
+    }
+
+    fs.mkdirs(getOutputPath());
+
+    // MAHOUT-817
+    if (pca && xiPath == null) {
+      xiPath = new Path(tempPath, "xi");
+      if (overwrite) {
+        fs.delete(xiPath, true);
+      }
+      MatrixColumnMeansJob.run(conf, inputPaths[0], xiPath);
+    }
+
+    SSVDSolver solver =
+      new SSVDSolver(conf,
+                     inputPaths,
+                     new Path(tempPath, "ssvd"),
+                     r,
+                     k,
+                     p,
+                     reduceTasks);
+
+    solver.setMinSplitSize(minSplitSize);
+    solver.setComputeU(computeU);
+    solver.setComputeV(computeV);
+    solver.setcUHalfSigma(cUHalfSigma);
+    solver.setcVHalfSigma(cVHalfSigma);
+    solver.setcUSigma(cUSigma);
+    solver.setOuterBlockHeight(h);
+    solver.setAbtBlockHeight(abh);
+    solver.setQ(q);
+    solver.setBroadcast(broadcast);
+    solver.setOverwrite(overwrite);
+
+    if (xiPath != null) {
+      solver.setPcaMeanPath(new Path(xiPath, "part-*"));
+    }
+
+    solver.run();
+
+    Vector svalues = solver.getSingularValues().viewPart(0, k);
+    SSVDHelper.saveVector(svalues, getOutputPath("sigma"), conf);
+
+    if (computeU && !fs.rename(new Path(solver.getUPath()), getOutputPath())) {
+      throw new IOException("Unable to move U results to the output path.");
+    }
+    if (cUHalfSigma
+        && !fs.rename(new Path(solver.getuHalfSigmaPath()), getOutputPath())) {
+      throw new IOException("Unable to move U*Sigma^0.5 results to the output 
path.");
+    }
+    if (cUSigma
+        && !fs.rename(new Path(solver.getuSigmaPath()), getOutputPath())) {
+      throw new IOException("Unable to move U*Sigma results to the output 
path.");
+    }
+    if (computeV && !fs.rename(new Path(solver.getVPath()), getOutputPath())) {
+      throw new IOException("Unable to move V results to the output path.");
+    }
+    if (cVHalfSigma
+        && !fs.rename(new Path(solver.getvHalfSigmaPath()), getOutputPath())) {
+      throw new IOException("Unable to move V*Sigma^0.5 results to the output 
path.");
+    }
+
+    // Delete the temp path on exit
+    fs.deleteOnExit(getTempPath());
+
+    return 0;
+  }
+
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new SSVDCli(), args);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDHelper.java
----------------------------------------------------------------------
diff --git 
a/mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDHelper.java 
b/mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDHelper.java
new file mode 100644
index 0000000..c585f33
--- /dev/null
+++ 
b/mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDHelper.java
@@ -0,0 +1,322 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.stochasticsvd;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+import com.google.common.io.Closeables;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.common.IOUtils;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterator;
+import 
org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterator;
+import org.apache.mahout.math.DenseMatrix;
+import org.apache.mahout.math.DenseSymmetricMatrix;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Matrix;
+import org.apache.mahout.math.UpperTriangular;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.function.Functions;
+
+/**
+ * set of small file manipulation helpers.
+ */
+public final class SSVDHelper {
+
+  private static final Pattern OUTPUT_FILE_PATTERN = 
Pattern.compile("(\\w+)-(m|r)-(\\d+)(\\.\\w+)?");
+
+  private SSVDHelper() {
+  }
+
+  /**
+   * load single vector from an hdfs file (possibly presented as glob).
+   */
+  static Vector loadVector(Path glob, Configuration conf) throws IOException {
+
+    SequenceFileDirValueIterator<VectorWritable> iter =
+      new SequenceFileDirValueIterator<>(glob,
+                                                       PathType.GLOB,
+                                                       null,
+                                                       null,
+                                                       true,
+                                                       conf);
+
+    try {
+      if (!iter.hasNext()) {
+        throw new IOException("Empty input while reading vector");
+      }
+      VectorWritable vw = iter.next();
+
+      if (iter.hasNext()) {
+        throw new IOException("Unexpected data after the end of vector file");
+      }
+
+      return vw.get();
+
+    } finally {
+      Closeables.close(iter, true);
+    }
+  }
+
+  /**
+   * save single vector into hdfs file.
+   *
+   * @param v vector to save
+   */
+  public static void saveVector(Vector v,
+                                Path vectorFilePath,
+                                Configuration conf) throws IOException {
+    VectorWritable vw = new VectorWritable(v);
+    FileSystem fs = FileSystem.get(conf);
+    try (SequenceFile.Writer w = new SequenceFile.Writer(fs,
+        conf,
+        vectorFilePath,
+        IntWritable.class,
+        VectorWritable.class)) {
+      w.append(new IntWritable(), vw);
+    }
+      /*
+       * this is a writer, no quiet close please. we must bail out on 
incomplete
+       * close.
+       */
+
+  }
+
+  /**
+   * sniff label type in the input files
+   */
+  static Class<? extends Writable> sniffInputLabelType(Path[] inputPath,
+                                                       Configuration conf)
+    throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+    for (Path p : inputPath) {
+      FileStatus[] fstats = fs.globStatus(p);
+      if (fstats == null || fstats.length == 0) {
+        continue;
+      }
+
+      FileStatus firstSeqFile;
+      if (fstats[0].isDir()) {
+        firstSeqFile = fs.listStatus(fstats[0].getPath(), 
PathFilters.logsCRCFilter())[0];
+      } else {
+        firstSeqFile = fstats[0];
+      }
+
+      SequenceFile.Reader r = null;
+      try {
+        r = new SequenceFile.Reader(fs, firstSeqFile.getPath(), conf);
+        return r.getKeyClass().asSubclass(Writable.class);
+      } finally {
+        Closeables.close(r, true);
+      }
+    }
+    throw new IOException("Unable to open input files to determine input label 
type.");
+  }
+
+  static final Comparator<FileStatus> PARTITION_COMPARATOR =
+    new Comparator<FileStatus>() {
+      private final Matcher matcher = OUTPUT_FILE_PATTERN.matcher("");
+
+      @Override
+      public int compare(FileStatus o1, FileStatus o2) {
+        matcher.reset(o1.getPath().getName());
+        if (!matcher.matches()) {
+          throw new IllegalArgumentException("Unexpected file name, unable to 
deduce partition #:"
+                                               + o1.getPath());
+        }
+        int p1 = Integer.parseInt(matcher.group(3));
+        matcher.reset(o2.getPath().getName());
+        if (!matcher.matches()) {
+          throw new IllegalArgumentException("Unexpected file name, unable to 
deduce partition #:"
+                                               + o2.getPath());
+        }
+
+        int p2 = Integer.parseInt(matcher.group(3));
+        return p1 - p2;
+      }
+
+    };
+
+  public static Iterator<Pair<Writable, Vector>> drmIterator(FileSystem fs, 
Path glob, Configuration conf,
+                                                             Deque<Closeable> 
closeables)
+    throws IOException {
+    SequenceFileDirIterator<Writable, VectorWritable> ret =
+      new SequenceFileDirIterator<>(glob,
+                                                            PathType.GLOB,
+                                                            
PathFilters.logsCRCFilter(),
+                                                            
PARTITION_COMPARATOR,
+                                                            true,
+                                                            conf);
+    closeables.addFirst(ret);
+    return Iterators.transform(ret, new Function<Pair<Writable, 
VectorWritable>, Pair<Writable, Vector>>() {
+      @Override
+      public Pair<Writable, Vector> apply(Pair<Writable, VectorWritable> p) {
+        return new Pair(p.getFirst(), p.getSecond().get());
+      }
+    });
+  }
+
+  /**
+   * helper capabiltiy to load distributed row matrices into dense matrix (to
+   * support tests mainly).
+   *
+   * @param fs   filesystem
+   * @param glob FS glob
+   * @param conf configuration
+   * @return Dense matrix array
+   */
+  public static DenseMatrix drmLoadAsDense(FileSystem fs, Path glob, 
Configuration conf) throws IOException {
+
+    Deque<Closeable> closeables = new ArrayDeque<>();
+    try {
+      List<double[]> denseData = new ArrayList<>();
+      for (Iterator<Pair<Writable, Vector>> iter = drmIterator(fs, glob, conf, 
closeables);
+           iter.hasNext(); ) {
+        Pair<Writable, Vector> p = iter.next();
+        Vector v = p.getSecond();
+        double[] dd = new double[v.size()];
+        if (v.isDense()) {
+          for (int i = 0; i < v.size(); i++) {
+            dd[i] = v.getQuick(i);
+          }
+        } else {
+          for (Vector.Element el : v.nonZeroes()) {
+            dd[el.index()] = el.get();
+          }
+        }
+        denseData.add(dd);
+      }
+      if (denseData.size() == 0) {
+        return null;
+      } else {
+        return new DenseMatrix(denseData.toArray(new 
double[denseData.size()][]));
+      }
+    } finally {
+      IOUtils.close(closeables);
+    }
+  }
+
+  /**
+   * Load multiple upper triangular matrices and sum them up.
+   *
+   * @return the sum of upper triangular inputs.
+   */
+  public static DenseSymmetricMatrix 
loadAndSumUpperTriangularMatricesAsSymmetric(Path glob, Configuration conf) 
throws IOException {
+    Vector v = loadAndSumUpVectors(glob, conf);
+    return v == null ? null : new DenseSymmetricMatrix(v);
+  }
+
+  /**
+   * @return sum of all vectors in different files specified by glob
+   */
+  public static Vector loadAndSumUpVectors(Path glob, Configuration conf)
+    throws IOException {
+
+    SequenceFileDirValueIterator<VectorWritable> iter =
+      new SequenceFileDirValueIterator<>(glob,
+                                                       PathType.GLOB,
+                                                       null,
+                                                       PARTITION_COMPARATOR,
+                                                       true,
+                                                       conf);
+
+    try {
+      Vector v = null;
+      while (iter.hasNext()) {
+        if (v == null) {
+          v = new DenseVector(iter.next().get());
+        } else {
+          v.assign(iter.next().get(), Functions.PLUS);
+        }
+      }
+      return v;
+
+    } finally {
+      Closeables.close(iter, true);
+    }
+
+  }
+
+  /**
+   * Load only one upper triangular matrix and issue error if mroe than one is
+   * found.
+   */
+  public static UpperTriangular loadUpperTriangularMatrix(Path glob, 
Configuration conf) throws IOException {
+
+    /*
+     * there still may be more than one file in glob and only one of them must
+     * contain the matrix.
+     */
+
+    try (SequenceFileDirValueIterator<VectorWritable> iter = new 
SequenceFileDirValueIterator<>(glob,
+        PathType.GLOB,
+        null,
+        null,
+        true,
+        conf)) {
+      if (!iter.hasNext()) {
+        throw new IOException("No triangular matrices found");
+      }
+      Vector v = iter.next().get();
+      UpperTriangular result = new UpperTriangular(v);
+      if (iter.hasNext()) {
+        throw new IOException("Unexpected overrun in upper triangular matrix 
files");
+      }
+      return result;
+
+    }
+  }
+
+  /**
+   * extracts row-wise raw data from a Mahout matrix for 3rd party solvers.
+   * Unfortunately values member is 100% encapsulated in {@link 
org.apache.mahout.math.DenseMatrix} at
+   * this point, so we have to resort to abstract element-wise copying.
+   */
+  public static double[][] extractRawData(Matrix m) {
+    int rows = m.numRows();
+    int cols = m.numCols();
+    double[][] result = new double[rows][];
+    for (int i = 0; i < rows; i++) {
+      result[i] = new double[cols];
+      for (int j = 0; j < cols; j++) {
+        result[i][j] = m.getQuick(i, j);
+      }
+    }
+    return result;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDSolver.java
----------------------------------------------------------------------
diff --git 
a/mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDSolver.java 
b/mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDSolver.java
new file mode 100644
index 0000000..94be450
--- /dev/null
+++ 
b/mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDSolver.java
@@ -0,0 +1,662 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.stochasticsvd;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.common.IOUtils;
+import org.apache.mahout.common.RandomUtils;
+import org.apache.mahout.math.*;
+import org.apache.mahout.math.function.Functions;
+import org.apache.mahout.math.solver.EigenDecomposition;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Deque;
+import java.util.Random;
+
+/**
+ * Stochastic SVD solver (API class).
+ * <p/>
+ * <p/>
+ * Implementation details are in my working notes in MAHOUT-376
+ * (https://issues.apache.org/jira/browse/MAHOUT-376).
+ * <p/>
+ * <p/>
+ * As of the time of this writing, I don't have benchmarks for this method in
+ * comparison to other methods. However, non-hadoop differentiating
+ * characteristics of this method are thought to be :
+ * <LI>"faster" and precision is traded off in favor of speed. However, there's
+ * lever in terms of "oversampling parameter" p. Higher values of p produce
+ * better precision but are trading off speed (and minimum RAM requirement).
+ * This also means that this method is almost guaranteed to be less precise 
than
+ * Lanczos unless full rank SVD decomposition is sought.
+ * <LI>"more scale" -- can presumably take on larger problems than Lanczos one
+ * (not confirmed by benchmark at this time)
+ * <p/>
+ * <p/>
+ * <p/>
+ * Specifically in regards to this implementation, <i>I think</i> couple of
+ * other differentiating points are:
+ * <LI>no need to specify input matrix height or width in command line, it is
+ * what it gets to be.
+ * <LI>supports any Writable as DRM row keys and copies them to correspondent
+ * rows of U matrix;
+ * <LI>can request U or V or U<sub>&sigma;</sub>=U* &Sigma;<sup>0.5</sup> or
+ * V<sub>&sigma;</sub>=V* &Sigma;<sup>0.5</sup> none of which would require 
pass
+ * over input A and these jobs are parallel map-only jobs.
+ * <p/>
+ * <p/>
+ * <p/>
+ * This class is central public API for SSVD solver. The use pattern is as
+ * follows:
+ * <p/>
+ * <UL>
+ * <LI>create the solver using constructor and supplying computation 
parameters.
+ * <LI>set optional parameters thru setter methods.
+ * <LI>call {@link #run()}.
+ * <LI> {@link #getUPath()} (if computed) returns the path to the directory
+ * containing m x k U matrix file(s).
+ * <LI> {@link #getVPath()} (if computed) returns the path to the directory
+ * containing n x k V matrix file(s).
+ * <p/>
+ * </UL>
+ */
+public final class SSVDSolver {
+
+  private Vector svalues;
+  private boolean computeU = true;
+  private boolean computeV = true;
+  private String uPath;
+  private String vPath;
+  private String uSigmaPath;
+  private String uHalfSigmaPath;
+  private String vSigmaPath;
+  private String vHalfSigmaPath;
+  private int outerBlockHeight = 30000;
+  private int abtBlockHeight = 200000;
+
+  // configured stuff
+  private final Configuration conf;
+  private final Path[] inputPath;
+  private final Path outputPath;
+  private final int ablockRows;
+  private final int k;
+  private final int p;
+  private int q;
+  private final int reduceTasks;
+  private int minSplitSize = -1;
+  private boolean cUHalfSigma;
+  private boolean cUSigma;
+  private boolean cVHalfSigma;
+  private boolean cVSigma;
+  private boolean overwrite;
+  private boolean broadcast = true;
+  private Path pcaMeanPath;
+
+  // for debugging
+  private long omegaSeed;
+
+  /**
+   * create new SSVD solver. Required parameters are passed to constructor to
+   * ensure they are set. Optional parameters can be set using setters .
+   * <p/>
+   *
+   * @param conf        hadoop configuration
+   * @param inputPath   Input path (should be compatible with 
DistributedRowMatrix as of
+   *                    the time of this writing).
+   * @param outputPath  Output path containing U, V and singular values vector 
files.
+   * @param ablockRows  The vertical hight of a q-block (bigger value require 
more memory
+   *                    in mappers+ perhaps larger {@code minSplitSize} values
+   * @param k           desired rank
+   * @param p           SSVD oversampling parameter
+   * @param reduceTasks Number of reduce tasks (where applicable)
+   */
+  public SSVDSolver(Configuration conf,
+                    Path[] inputPath,
+                    Path outputPath,
+                    int ablockRows,
+                    int k,
+                    int p,
+                    int reduceTasks) {
+    this.conf = conf;
+    this.inputPath = inputPath;
+    this.outputPath = outputPath;
+    this.ablockRows = ablockRows;
+    this.k = k;
+    this.p = p;
+    this.reduceTasks = reduceTasks;
+  }
+
+  public int getQ() {
+    return q;
+  }
+
+  /**
+   * sets q, amount of additional power iterations to increase precision
+   * (0..2!). Defaults to 0.
+   *
+   * @param q
+   */
+  public void setQ(int q) {
+    this.q = q;
+  }
+
+  /**
+   * The setting controlling whether to compute U matrix of low rank SSVD.
+   * Default true.
+   */
+  public void setComputeU(boolean val) {
+    computeU = val;
+  }
+
+  /**
+   * Setting controlling whether to compute V matrix of low-rank SSVD.
+   *
+   * @param val true if we want to output V matrix. Default is true.
+   */
+  public void setComputeV(boolean val) {
+    computeV = val;
+  }
+
+  /**
+   * @param cUHat whether produce U*Sigma^0.5 as well (default false)
+   */
+  public void setcUHalfSigma(boolean cUHat) {
+    this.cUHalfSigma = cUHat;
+  }
+
+  /**
+   * @param cVHat whether produce V*Sigma^0.5 as well (default false)
+   */
+  public void setcVHalfSigma(boolean cVHat) {
+    this.cVHalfSigma = cVHat;
+  }
+
+  /**
+   * @param cUSigma whether produce U*Sigma output as well (default false)
+   */
+  public void setcUSigma(boolean cUSigma) {
+    this.cUSigma = cUSigma;
+  }
+
+  /**
+   * @param cVSigma whether produce V*Sigma output as well (default false)
+   */
+  public void setcVSigma(boolean cVSigma) {
+    this.cVSigma = cVSigma;
+  }
+
+  /**
+   * Sometimes, if requested A blocks become larger than a split, we may need 
to
+   * use that to ensure at least k+p rows of A get into a split. This is
+   * requirement necessary to obtain orthonormalized Q blocks of SSVD.
+   *
+   * @param size the minimum split size to use
+   */
+  public void setMinSplitSize(int size) {
+    minSplitSize = size;
+  }
+
+  /**
+   * This contains k+p singular values resulted from the solver run.
+   *
+   * @return singlular values (largest to smallest)
+   */
+  public Vector getSingularValues() {
+    return svalues;
+  }
+
+  /**
+   * returns U path (if computation were requested and successful).
+   *
+   * @return U output hdfs path, or null if computation was not completed for
+   *         whatever reason.
+   */
+  public String getUPath() {
+    return uPath;
+  }
+
+  /**
+   * return V path ( if computation was requested and successful ) .
+   *
+   * @return V output hdfs path, or null if computation was not completed for
+   *         whatever reason.
+   */
+  public String getVPath() {
+    return vPath;
+  }
+
+  public String getuSigmaPath() {
+    return uSigmaPath;
+  }
+
+  public String getuHalfSigmaPath() {
+    return uHalfSigmaPath;
+  }
+
+  public String getvSigmaPath() {
+    return vSigmaPath;
+  }
+
+  public String getvHalfSigmaPath() {
+    return vHalfSigmaPath;
+  }
+
+  public boolean isOverwrite() {
+    return overwrite;
+  }
+
+  /**
+   * if true, driver to clean output folder first if exists.
+   *
+   * @param overwrite
+   */
+  public void setOverwrite(boolean overwrite) {
+    this.overwrite = overwrite;
+  }
+
+  public int getOuterBlockHeight() {
+    return outerBlockHeight;
+  }
+
+  /**
+   * The height of outer blocks during Q'A multiplication. Higher values allow
+   * to produce less keys for combining and shuffle and sort therefore somewhat
+   * improving running time; but require larger blocks to be formed in RAM (so
+   * setting this too high can lead to OOM).
+   *
+   * @param outerBlockHeight
+   */
+  public void setOuterBlockHeight(int outerBlockHeight) {
+    this.outerBlockHeight = outerBlockHeight;
+  }
+
+  public int getAbtBlockHeight() {
+    return abtBlockHeight;
+  }
+
+  /**
+   * the block height of Y_i during power iterations. It is probably important
+   * to set it higher than default 200,000 for extremely sparse inputs and when
+   * more ram is available. y_i block height and ABt job would occupy approx.
+   * abtBlockHeight x (k+p) x sizeof (double) (as dense).
+   *
+   * @param abtBlockHeight
+   */
+  public void setAbtBlockHeight(int abtBlockHeight) {
+    this.abtBlockHeight = abtBlockHeight;
+  }
+
+  public boolean isBroadcast() {
+    return broadcast;
+  }
+
+  /**
+   * If this property is true, use DestributedCache mechanism to broadcast some
+   * stuff around. May improve efficiency. Default is false.
+   *
+   * @param broadcast
+   */
+  public void setBroadcast(boolean broadcast) {
+    this.broadcast = broadcast;
+  }
+
+  /**
+   * Optional. Single-vector file path for a vector (aka xi in MAHOUT-817
+   * working notes) to be subtracted from each row of input.
+   * <p/>
+   * <p/>
+   * Brute force approach would force would turn input into a dense input, 
which
+   * is often not very desirable. By supplying this offset to SSVD solver, we
+   * can avoid most of that overhead due to increased input density.
+   * <p/>
+   * <p/>
+   * The vector size for this offest is n (width of A input). In PCA and R this
+   * is known as "column means", but in this case it can be any offset of row
+   * vectors of course to propagate into SSVD solution.
+   * <p/>
+   */
+  public Path getPcaMeanPath() {
+    return pcaMeanPath;
+  }
+
+  public void setPcaMeanPath(Path pcaMeanPath) {
+    this.pcaMeanPath = pcaMeanPath;
+  }
+
+  long getOmegaSeed() {
+    return omegaSeed;
+  }
+
+  /**
+   * run all SSVD jobs.
+   *
+   * @throws IOException if I/O condition occurs.
+   */
+  public void run() throws IOException {
+
+    Deque<Closeable> closeables = Lists.newLinkedList();
+    try {
+      Class<? extends Writable> labelType =
+        SSVDHelper.sniffInputLabelType(inputPath, conf);
+      FileSystem fs = FileSystem.get(conf);
+
+      Path qPath = new Path(outputPath, "Q-job");
+      Path btPath = new Path(outputPath, "Bt-job");
+      Path uHatPath = new Path(outputPath, "UHat");
+      Path svPath = new Path(outputPath, "Sigma");
+      Path uPath = new Path(outputPath, "U");
+      Path uSigmaPath = new Path(outputPath, "USigma");
+      Path uHalfSigmaPath = new Path(outputPath, "UHalfSigma");
+      Path vPath = new Path(outputPath, "V");
+      Path vHalfSigmaPath = new Path(outputPath, "VHalfSigma");
+      Path vSigmaPath = new Path(outputPath, "VSigma");
+
+      Path pcaBasePath = new Path(outputPath, "pca");
+
+      if (overwrite) {
+        fs.delete(outputPath, true);
+      }
+
+      if (pcaMeanPath != null) {
+        fs.mkdirs(pcaBasePath);
+      }
+      Random rnd = RandomUtils.getRandom();
+      omegaSeed = rnd.nextLong();
+
+      Path sbPath = null;
+      double xisquaredlen = 0.0;
+      if (pcaMeanPath != null) {
+        /*
+         * combute s_b0 if pca offset present.
+         * 
+         * Just in case, we treat xi path as a possible reduce or otherwise
+         * multiple task output that we assume we need to sum up partial
+         * components. If it is just one file, it will work too.
+         */
+
+        Vector xi = SSVDHelper.loadAndSumUpVectors(pcaMeanPath, conf);
+        if (xi == null) {
+          throw new IOException(String.format("unable to load mean path xi 
from %s.",
+                                              pcaMeanPath.toString()));
+        }
+
+        xisquaredlen = xi.dot(xi);
+        Omega omega = new Omega(omegaSeed, k + p);
+        Vector s_b0 = omega.mutlithreadedTRightMultiply(xi);
+
+        SSVDHelper.saveVector(s_b0, sbPath = new Path(pcaBasePath, 
"somega.seq"), conf);
+      }
+
+      /*
+       * if we work with pca offset, we need to precompute s_bq0 aka s_omega 
for
+       * jobs to use.
+       */
+
+      QJob.run(conf,
+               inputPath,
+               sbPath,
+               qPath,
+               ablockRows,
+               minSplitSize,
+               k,
+               p,
+               omegaSeed,
+               reduceTasks);
+
+      /*
+       * restrict number of reducers to a reasonable number so we don't have to
+       * run too many additions in the frontend when reconstructing BBt for the
+       * last B' and BB' computations. The user may not realize that and gives 
a
+       * bit too many (I would be happy i that were ever the case though).
+       */
+
+      BtJob.run(conf,
+                inputPath,
+                qPath,
+                pcaMeanPath,
+                btPath,
+                minSplitSize,
+                k,
+                p,
+                outerBlockHeight,
+                q <= 0 ? Math.min(1000, reduceTasks) : reduceTasks,
+                broadcast,
+                labelType,
+                q <= 0);
+
+      sbPath = new Path(btPath, BtJob.OUTPUT_SB + "-*");
+      Path sqPath = new Path(btPath, BtJob.OUTPUT_SQ + "-*");
+
+      // power iterations
+      for (int i = 0; i < q; i++) {
+
+        qPath = new Path(outputPath, String.format("ABt-job-%d", i + 1));
+        Path btPathGlob = new Path(btPath, BtJob.OUTPUT_BT + "-*");
+        ABtDenseOutJob.run(conf,
+                           inputPath,
+                           btPathGlob,
+                           pcaMeanPath,
+                           sqPath,
+                           sbPath,
+                           qPath,
+                           ablockRows,
+                           minSplitSize,
+                           k,
+                           p,
+                           abtBlockHeight,
+                           reduceTasks,
+                           broadcast);
+
+        btPath = new Path(outputPath, String.format("Bt-job-%d", i + 1));
+
+        BtJob.run(conf,
+                  inputPath,
+                  qPath,
+                  pcaMeanPath,
+                  btPath,
+                  minSplitSize,
+                  k,
+                  p,
+                  outerBlockHeight,
+                  i == q - 1 ? Math.min(1000, reduceTasks) : reduceTasks,
+                  broadcast,
+                  labelType,
+                  i == q - 1);
+        sbPath = new Path(btPath, BtJob.OUTPUT_SB + "-*");
+        sqPath = new Path(btPath, BtJob.OUTPUT_SQ + "-*");
+      }
+
+      DenseSymmetricMatrix bbt =
+        SSVDHelper.loadAndSumUpperTriangularMatricesAsSymmetric(new 
Path(btPath,
+                                                                         
BtJob.OUTPUT_BBT
+                                                                           + 
"-*"), conf);
+
+      // convert bbt to something our eigensolver could understand
+      assert bbt.columnSize() == k + p;
+
+      /*
+       * we currently use a 3rd party in-core eigensolver. So we need just a
+       * dense array representation for it.
+       */
+      Matrix bbtSquare = new DenseMatrix(k + p, k + p);
+      bbtSquare.assign(bbt);
+
+      // MAHOUT-817
+      if (pcaMeanPath != null) {
+        Vector sq = SSVDHelper.loadAndSumUpVectors(sqPath, conf);
+        Vector sb = SSVDHelper.loadAndSumUpVectors(sbPath, conf);
+        Matrix mC = sq.cross(sb);
+
+        bbtSquare.assign(mC, Functions.MINUS);
+        bbtSquare.assign(mC.transpose(), Functions.MINUS);
+
+        Matrix outerSq = sq.cross(sq);
+        outerSq.assign(Functions.mult(xisquaredlen));
+        bbtSquare.assign(outerSq, Functions.PLUS);
+
+      }
+
+      EigenDecomposition eigen = new EigenDecomposition(bbtSquare);
+
+      Matrix uHat = eigen.getV();
+      svalues = eigen.getRealEigenvalues().clone();
+
+      svalues.assign(Functions.SQRT);
+
+      // save/redistribute UHat
+      fs.mkdirs(uHatPath);
+      DistributedRowMatrixWriter.write(uHatPath =
+                                         new Path(uHatPath, "uhat.seq"), conf, 
uHat);
+
+      // save sigma.
+      SSVDHelper.saveVector(svalues,
+                            svPath = new Path(svPath, "svalues.seq"),
+                            conf);
+
+      UJob ujob = null;
+      if (computeU) {
+        ujob = new UJob();
+        ujob.run(conf,
+                 new Path(btPath, BtJob.OUTPUT_Q + "-*"),
+                 uHatPath,
+                 svPath,
+                 uPath,
+                 k,
+                 reduceTasks,
+                 labelType,
+                 OutputScalingEnum.NOSCALING);
+        // actually this is map-only job anyway
+      }
+
+      UJob uhsjob = null;
+      if (cUHalfSigma) {
+        uhsjob = new UJob();
+        uhsjob.run(conf,
+                   new Path(btPath, BtJob.OUTPUT_Q + "-*"),
+                   uHatPath,
+                   svPath,
+                   uHalfSigmaPath,
+                   k,
+                   reduceTasks,
+                   labelType,
+                   OutputScalingEnum.HALFSIGMA);
+      }
+
+      UJob usjob = null;
+      if (cUSigma) {
+        usjob = new UJob();
+        usjob.run(conf,
+                  new Path(btPath, BtJob.OUTPUT_Q + "-*"),
+                  uHatPath,
+                  svPath,
+                  uSigmaPath,
+                  k,
+                  reduceTasks,
+                  labelType,
+                  OutputScalingEnum.SIGMA);
+      }
+
+      VJob vjob = null;
+      if (computeV) {
+        vjob = new VJob();
+        vjob.run(conf,
+                 new Path(btPath, BtJob.OUTPUT_BT + "-*"),
+                 pcaMeanPath,
+                 sqPath,
+                 uHatPath,
+                 svPath,
+                 vPath,
+                 k,
+                 reduceTasks,
+                 OutputScalingEnum.NOSCALING);
+      }
+
+      VJob vhsjob = null;
+      if (cVHalfSigma) {
+        vhsjob = new VJob();
+        vhsjob.run(conf,
+                   new Path(btPath, BtJob.OUTPUT_BT + "-*"),
+                   pcaMeanPath,
+                   sqPath,
+                   uHatPath,
+                   svPath,
+                   vHalfSigmaPath,
+                   k,
+                   reduceTasks,
+                   OutputScalingEnum.HALFSIGMA);
+      }
+
+      VJob vsjob = null;
+      if (cVSigma) {
+        vsjob = new VJob();
+        vsjob.run(conf,
+                  new Path(btPath, BtJob.OUTPUT_BT + "-*"),
+                  pcaMeanPath,
+                  sqPath,
+                  uHatPath,
+                  svPath,
+                  vSigmaPath,
+                  k,
+                  reduceTasks,
+                  OutputScalingEnum.SIGMA);
+      }
+
+      if (ujob != null) {
+        ujob.waitForCompletion();
+        this.uPath = uPath.toString();
+      }
+      if (uhsjob != null) {
+        uhsjob.waitForCompletion();
+        this.uHalfSigmaPath = uHalfSigmaPath.toString();
+      }
+      if (usjob != null) {
+        usjob.waitForCompletion();
+        this.uSigmaPath = uSigmaPath.toString();
+      }
+      if (vjob != null) {
+        vjob.waitForCompletion();
+        this.vPath = vPath.toString();
+      }
+      if (vhsjob != null) {
+        vhsjob.waitForCompletion();
+        this.vHalfSigmaPath = vHalfSigmaPath.toString();
+      }
+      if (vsjob != null) {
+        vsjob.waitForCompletion();
+        this.vSigmaPath = vSigmaPath.toString();
+      }
+
+    } catch (InterruptedException exc) {
+      throw new IOException("Interrupted", exc);
+    } catch (ClassNotFoundException exc) {
+      throw new IOException(exc);
+
+    } finally {
+      IOUtils.close(closeables);
+    }
+  }
+
+  enum OutputScalingEnum {
+    NOSCALING, SIGMA, HALFSIGMA
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SparseRowBlockAccumulator.java
----------------------------------------------------------------------
diff --git 
a/mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SparseRowBlockAccumulator.java
 
b/mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SparseRowBlockAccumulator.java
new file mode 100644
index 0000000..081f55a
--- /dev/null
+++ 
b/mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SparseRowBlockAccumulator.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.stochasticsvd;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.mahout.math.Vector;
+
+/**
+ * Aggregate incoming rows into blocks based on the row number (long). Rows can
+ * be sparse (meaning they come perhaps in big intervals) and don't even have 
to
+ * come in any order, but they should be coming in proximity, so when we output
+ * block key, we hopefully aggregate more than one row by then.
+ * <P>
+ * 
+ * If block is sufficiently large to fit all rows that mapper may produce, it
+ * will not even ever hit a spill at all as we would already be plussing
+ * efficiently in the mapper.
+ * <P>
+ * 
+ * Also, for sparse inputs it will also be working especially well if 
transposed
+ * columns of the left side matrix and corresponding rows of the right side
+ * matrix experience sparsity in same elements.
+ * <P>
+ * 
+ */
+public class SparseRowBlockAccumulator implements
+    OutputCollector<Long, Vector>, Closeable {
+
+  private final int height;
+  private final OutputCollector<LongWritable, SparseRowBlockWritable> delegate;
+  private long currentBlockNum = -1;
+  private SparseRowBlockWritable block;
+  private final LongWritable blockKeyW = new LongWritable();
+
+  public SparseRowBlockAccumulator(int height,
+                                   OutputCollector<LongWritable, 
SparseRowBlockWritable> delegate) {
+    this.height = height;
+    this.delegate = delegate;
+  }
+
+  private void flushBlock() throws IOException {
+    if (block == null || block.getNumRows() == 0) {
+      return;
+    }
+    blockKeyW.set(currentBlockNum);
+    delegate.collect(blockKeyW, block);
+    block.clear();
+  }
+
+  @Override
+  public void collect(Long rowIndex, Vector v) throws IOException {
+
+    long blockKey = rowIndex / height;
+
+    if (blockKey != currentBlockNum) {
+      flushBlock();
+      if (block == null) {
+        block = new SparseRowBlockWritable(100);
+      }
+      currentBlockNum = blockKey;
+    }
+
+    block.plusRow((int) (rowIndex % height), v);
+  }
+
+  @Override
+  public void close() throws IOException {
+    flushBlock();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SparseRowBlockWritable.java
----------------------------------------------------------------------
diff --git 
a/mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SparseRowBlockWritable.java
 
b/mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SparseRowBlockWritable.java
new file mode 100644
index 0000000..b7f5b94
--- /dev/null
+++ 
b/mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SparseRowBlockWritable.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.stochasticsvd;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.math.Varint;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.function.PlusMult;
+
+/**
+ * block that supports accumulating rows and their sums , suitable for combiner
+ * and reducers of multiplication jobs.
+ */
+public class SparseRowBlockWritable implements Writable {
+
+  private int[] rowIndices;
+  private Vector[] rows;
+  private int numRows;
+
+  public SparseRowBlockWritable() {
+    this(10);
+  }
+
+  public SparseRowBlockWritable(int initialCapacity) {
+    rowIndices = new int[initialCapacity];
+    rows = new Vector[initialCapacity];
+  }
+
+  public int[] getRowIndices() {
+    return rowIndices;
+  }
+
+  public Vector[] getRows() {
+    return rows;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    numRows = Varint.readUnsignedVarInt(in);
+    if (rows == null || rows.length < numRows) {
+      rows = new Vector[numRows];
+      rowIndices = new int[numRows];
+    }
+    VectorWritable vw = new VectorWritable();
+    for (int i = 0; i < numRows; i++) {
+      rowIndices[i] = Varint.readUnsignedVarInt(in);
+      vw.readFields(in);
+      rows[i] = vw.get().clone();
+    }
+
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    Varint.writeUnsignedVarInt(numRows, out);
+    VectorWritable vw = new VectorWritable();
+    for (int i = 0; i < numRows; i++) {
+      Varint.writeUnsignedVarInt(rowIndices[i], out);
+      vw.set(rows[i]);
+      vw.write(out);
+    }
+  }
+
+  public void plusRow(int index, Vector row) {
+    /*
+     * often accumulation goes in row-increasing order, so check for this to
+     * avoid binary search (another log Height multiplier).
+     */
+
+    int pos =
+      numRows == 0 || rowIndices[numRows - 1] < index ? -numRows - 1 : Arrays
+        .binarySearch(rowIndices, 0, numRows, index);
+    if (pos >= 0) {
+      rows[pos].assign(row, PlusMult.plusMult(1));
+    } else {
+      insertIntoPos(-pos - 1, index, row);
+    }
+  }
+
+  private void insertIntoPos(int pos, int rowIndex, Vector row) {
+    // reallocate if needed
+    if (numRows == rows.length) {
+      rows = Arrays.copyOf(rows, numRows + 1 << 1);
+      rowIndices = Arrays.copyOf(rowIndices, numRows + 1 << 1);
+    }
+    // make a hole if needed
+    System.arraycopy(rows, pos, rows, pos + 1, numRows - pos);
+    System.arraycopy(rowIndices, pos, rowIndices, pos + 1, numRows - pos);
+    // put
+    rowIndices[pos] = rowIndex;
+    rows[pos] = row.clone();
+    numRows++;
+  }
+
+  /**
+   * pluses one block into another. Use it for accumulation of partial 
products in
+   * combiners and reducers.
+   * 
+   * @param bOther
+   *          block to add
+   */
+  public void plusBlock(SparseRowBlockWritable bOther) {
+    /*
+     * since we maintained row indices in a sorted order, we can run sort merge
+     * to expedite this operation
+     */
+    int i = 0;
+    int j = 0;
+    while (i < numRows && j < bOther.numRows) {
+      while (i < numRows && rowIndices[i] < bOther.rowIndices[j]) {
+        i++;
+      }
+      if (i < numRows) {
+        if (rowIndices[i] == bOther.rowIndices[j]) {
+          rows[i].assign(bOther.rows[j], PlusMult.plusMult(1));
+        } else {
+          // insert into i-th position
+          insertIntoPos(i, bOther.rowIndices[j], bOther.rows[j]);
+        }
+        // increment in either case
+        i++;
+        j++;
+      }
+    }
+    for (; j < bOther.numRows; j++) {
+      insertIntoPos(numRows, bOther.rowIndices[j], bOther.rows[j]);
+    }
+  }
+
+  public int getNumRows() {
+    return numRows;
+  }
+
+  public void clear() {
+    numRows = 0;
+    Arrays.fill(rows, null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SplitPartitionedWritable.java
----------------------------------------------------------------------
diff --git 
a/mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SplitPartitionedWritable.java
 
b/mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SplitPartitionedWritable.java
new file mode 100644
index 0000000..7caeb4a
--- /dev/null
+++ 
b/mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SplitPartitionedWritable.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.math.hadoop.stochasticsvd;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.math.Varint;
+
+/**
+ * a key for vectors allowing to identify them by their coordinates in original
+ * split of A.
+ * 
+ * We assume all passes over A results in the same splits, thus, we can always
+ * prepare side files that come into contact with A, sp that they are sorted 
and
+ * partitioned same way.
+ * <P>
+ * 
+ * Hashcode is defined the way that all records of the same split go to the 
same
+ * reducer.
+ * <P>
+ * 
+ * In addition, we are defining a grouping comparator allowing group one split
+ * into the same reducer group.
+ * <P>
+ * 
+ */
+public class SplitPartitionedWritable implements
+    WritableComparable<SplitPartitionedWritable> {
+
+  private int taskId;
+  private long taskItemOrdinal;
+
+  public SplitPartitionedWritable(Mapper<?, ?, ?, ?>.Context mapperContext) {
+    // this is basically a split # if i understand it right
+    taskId = mapperContext.getTaskAttemptID().getTaskID().getId();
+  }
+
+  public SplitPartitionedWritable() {
+  }
+
+  public int getTaskId() {
+    return taskId;
+  }
+
+  public long getTaskItemOrdinal() {
+    return taskItemOrdinal;
+  }
+
+  public void incrementItemOrdinal() {
+    taskItemOrdinal++;
+  }
+
+  public void setTaskItemOrdinal(long taskItemOrdinal) {
+    this.taskItemOrdinal = taskItemOrdinal;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    taskId = Varint.readUnsignedVarInt(in);
+    taskItemOrdinal = Varint.readUnsignedVarLong(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    Varint.writeUnsignedVarInt(taskId, out);
+    Varint.writeUnsignedVarLong(taskItemOrdinal, out);
+  }
+
+  @Override
+  public int hashCode() {
+    int prime = 31;
+    int result = 1;
+    result = prime * result + taskId;
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+    SplitPartitionedWritable other = (SplitPartitionedWritable) obj;
+    return taskId == other.taskId;
+  }
+
+  @Override
+  public int compareTo(SplitPartitionedWritable o) {
+    if (taskId < o.taskId) {
+      return -1;
+    }
+    if (taskId > o.taskId) {
+      return 1;
+    }
+    if (taskItemOrdinal < o.taskItemOrdinal) {
+      return -1;
+    }
+    if (taskItemOrdinal > o.taskItemOrdinal) {
+      return 1;
+    }
+    return 0;
+  }
+
+  public static final class SplitGroupingComparator extends WritableComparator 
implements Serializable {
+
+    public SplitGroupingComparator() {
+      super(SplitPartitionedWritable.class, true);
+    }
+
+    @Override
+    public int compare(Object a, Object b) {
+      SplitPartitionedWritable o1 = (SplitPartitionedWritable) a;
+      SplitPartitionedWritable o2 = (SplitPartitionedWritable) b;
+
+      if (o1.taskId < o2.taskId) {
+        return -1;
+      }
+      if (o1.taskId > o2.taskId) {
+        return 1;
+      }
+      return 0;
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/UJob.java
----------------------------------------------------------------------
diff --git 
a/mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/UJob.java 
b/mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/UJob.java
new file mode 100644
index 0000000..a6db079
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/UJob.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.stochasticsvd;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Matrix;
+import org.apache.mahout.math.NamedVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.function.Functions;
+
+/**
+ * Computes U=Q*Uhat of SSVD (optionally adding x pow(Sigma, 0.5) )
+ * 
+ */
+public class UJob {
+  private static final String OUTPUT_U = "u";
+  private static final String PROP_UHAT_PATH = "ssvd.uhat.path";
+  private static final String PROP_SIGMA_PATH = "ssvd.sigma.path";
+  private static final String PROP_OUTPUT_SCALING = "ssvd.u.output.scaling";
+  private static final String PROP_K = "ssvd.k";
+
+  private Job job;
+
+  public void run(Configuration conf, Path inputPathQ, Path inputUHatPath,
+      Path sigmaPath, Path outputPath, int k, int numReduceTasks,
+      Class<? extends Writable> labelClass, SSVDSolver.OutputScalingEnum 
outputScaling)
+    throws ClassNotFoundException, InterruptedException, IOException {
+
+    job = new Job(conf);
+    job.setJobName("U-job");
+    job.setJarByClass(UJob.class);
+
+    job.setInputFormatClass(SequenceFileInputFormat.class);
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
+    FileInputFormat.setInputPaths(job, inputPathQ);
+    FileOutputFormat.setOutputPath(job, outputPath);
+
+    // WARN: tight hadoop integration here:
+    job.getConfiguration().set("mapreduce.output.basename", OUTPUT_U);
+    FileOutputFormat.setCompressOutput(job, true);
+    FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);
+    SequenceFileOutputFormat.setOutputCompressionType(job, 
CompressionType.BLOCK);
+
+    job.setMapperClass(UMapper.class);
+    job.setMapOutputKeyClass(IntWritable.class);
+    job.setMapOutputValueClass(VectorWritable.class);
+
+    job.setOutputKeyClass(labelClass);
+    job.setOutputValueClass(VectorWritable.class);
+
+    job.getConfiguration().set(PROP_UHAT_PATH, inputUHatPath.toString());
+    job.getConfiguration().set(PROP_SIGMA_PATH, sigmaPath.toString());
+    job.getConfiguration().set(PROP_OUTPUT_SCALING, outputScaling.name());
+    job.getConfiguration().setInt(PROP_K, k);
+    job.setNumReduceTasks(0);
+    job.submit();
+
+  }
+
+  public void waitForCompletion() throws IOException, ClassNotFoundException,
+      InterruptedException {
+    job.waitForCompletion(false);
+
+    if (!job.isSuccessful()) {
+      throw new IOException("U job unsuccessful.");
+    }
+
+  }
+
+  public static final class UMapper extends
+      Mapper<Writable, VectorWritable, Writable, VectorWritable> {
+
+    private Matrix uHat;
+    private DenseVector uRow;
+    private VectorWritable uRowWritable;
+    private int kp;
+    private int k;
+    private Vector sValues;
+
+    @Override
+    protected void map(Writable key, VectorWritable value, Context context)
+      throws IOException, InterruptedException {
+      Vector qRow = value.get();
+      if (sValues != null) {
+        for (int i = 0; i < k; i++) {
+          uRow.setQuick(i,
+                        qRow.dot(uHat.viewColumn(i)) * sValues.getQuick(i));
+        }
+      } else {
+        for (int i = 0; i < k; i++) {
+          uRow.setQuick(i, qRow.dot(uHat.viewColumn(i)));
+        }
+      }
+
+      /*
+       * MAHOUT-1067: inherit A names too.
+       */
+      if (qRow instanceof NamedVector) {
+        uRowWritable.set(new NamedVector(uRow, ((NamedVector) 
qRow).getName()));
+      } else {
+        uRowWritable.set(uRow);
+      }
+
+      context.write(key, uRowWritable); // U inherits original A row labels.
+    }
+
+    @Override
+    protected void setup(Context context) throws IOException,
+        InterruptedException {
+      super.setup(context);
+      Path uHatPath = new Path(context.getConfiguration().get(PROP_UHAT_PATH));
+      Path sigmaPath = new 
Path(context.getConfiguration().get(PROP_SIGMA_PATH));
+      FileSystem fs = FileSystem.get(uHatPath.toUri(), 
context.getConfiguration());
+
+      uHat = SSVDHelper.drmLoadAsDense(fs, uHatPath, 
context.getConfiguration());
+      // since uHat is (k+p) x (k+p)
+      kp = uHat.columnSize();
+      k = context.getConfiguration().getInt(PROP_K, kp);
+      uRow = new DenseVector(k);
+      uRowWritable = new VectorWritable(uRow);
+
+      SSVDSolver.OutputScalingEnum outputScaling =
+        SSVDSolver.OutputScalingEnum.valueOf(context.getConfiguration()
+                                                    .get(PROP_OUTPUT_SCALING));
+      switch (outputScaling) {
+        case SIGMA:
+          sValues = SSVDHelper.loadVector(sigmaPath, 
context.getConfiguration());
+          break;
+        case HALFSIGMA:
+          sValues = SSVDHelper.loadVector(sigmaPath, 
context.getConfiguration());
+          sValues.assign(Functions.SQRT);
+          break;
+        default:
+      }
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/VJob.java
----------------------------------------------------------------------
diff --git 
a/mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/VJob.java 
b/mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/VJob.java
new file mode 100644
index 0000000..daee93d
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/VJob.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.stochasticsvd;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.mahout.math.DenseMatrix;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Matrix;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.function.Functions;
+import org.apache.mahout.math.function.PlusMult;
+
+public class VJob {
+  private static final String OUTPUT_V = "v";
+  private static final String PROP_UHAT_PATH = "ssvd.uhat.path";
+  private static final String PROP_SIGMA_PATH = "ssvd.sigma.path";
+  private static final String PROP_OUTPUT_SCALING = "ssvd.v.output.scaling";
+  private static final String PROP_K = "ssvd.k";
+  public static final String PROP_SQ_PATH = "ssvdpca.sq.path";
+  public static final String PROP_XI_PATH = "ssvdpca.xi.path";
+
+  private Job job;
+
+  public static final class VMapper extends
+      Mapper<IntWritable, VectorWritable, IntWritable, VectorWritable> {
+
+    private Matrix uHat;
+    private Vector vRow;
+    private Vector sValues;
+    private VectorWritable vRowWritable;
+    private int kp;
+    private int k;
+    /*
+     * xi and s_q are PCA-related corrections, per MAHOUT-817
+     */
+    private Vector xi;
+    private Vector sq;
+    private final PlusMult plusMult = new PlusMult(0);
+
+    @Override
+    protected void map(IntWritable key, VectorWritable value, Context context)
+      throws IOException, InterruptedException {
+      Vector bCol = value.get();
+      /*
+       * MAHOUT-817: PCA correction for B': b_{col=i} -= s_q * xi_{i}
+       */
+      if (xi != null) {
+        /*
+         * code defensively against shortened xi which may be externally
+         * supplied
+         */
+        int btIndex = key.get();
+        double xii = xi.size() > btIndex ? xi.getQuick(btIndex) : 0.0;
+        plusMult.setMultiplicator(-xii);
+        bCol.assign(sq, plusMult);
+      }
+
+      for (int i = 0; i < k; i++) {
+        vRow.setQuick(i, bCol.dot(uHat.viewColumn(i)) / sValues.getQuick(i));
+      }
+      context.write(key, vRowWritable);
+    }
+
+    @Override
+    protected void setup(Context context) throws IOException,
+      InterruptedException {
+      super.setup(context);
+
+      Configuration conf = context.getConfiguration();
+      FileSystem fs = FileSystem.get(conf);
+      Path uHatPath = new Path(conf.get(PROP_UHAT_PATH));
+
+      Path sigmaPath = new Path(conf.get(PROP_SIGMA_PATH));
+
+      uHat = SSVDHelper.drmLoadAsDense(fs, uHatPath, conf);
+      // since uHat is (k+p) x (k+p)
+      kp = uHat.columnSize();
+      k = context.getConfiguration().getInt(PROP_K, kp);
+      vRow = new DenseVector(k);
+      vRowWritable = new VectorWritable(vRow);
+
+      sValues = SSVDHelper.loadVector(sigmaPath, conf);
+      SSVDSolver.OutputScalingEnum outputScaling =
+        SSVDSolver.OutputScalingEnum.valueOf(context.getConfiguration()
+                                                    .get(PROP_OUTPUT_SCALING));
+      switch (outputScaling) {
+        case SIGMA:
+          sValues.assign(1.0);
+          break;
+        case HALFSIGMA:
+          sValues = SSVDHelper.loadVector(sigmaPath, 
context.getConfiguration());
+          sValues.assign(Functions.SQRT);
+          break;
+        default:
+      }
+
+      /*
+       * PCA -related corrections (MAHOUT-817)
+       */
+      String xiPathStr = conf.get(PROP_XI_PATH);
+      if (xiPathStr != null) {
+        xi = SSVDHelper.loadAndSumUpVectors(new Path(xiPathStr), conf);
+        sq =
+          SSVDHelper.loadAndSumUpVectors(new Path(conf.get(PROP_SQ_PATH)), 
conf);
+      }
+
+    }
+
+  }
+
+  /**
+   * 
+   * @param conf
+   * @param inputPathBt
+   * @param xiPath
+   *          PCA row mean (MAHOUT-817, to fix B')
+   * @param sqPath
+   *          sq (MAHOUT-817, to fix B')
+   * @param inputUHatPath
+   * @param inputSigmaPath
+   * @param outputPath
+   * @param k
+   * @param numReduceTasks
+   * @param outputScaling output scaling: apply Sigma, or Sigma^0.5, or none
+   * @throws ClassNotFoundException
+   * @throws InterruptedException
+   * @throws IOException
+   */
+  public void run(Configuration conf,
+                  Path inputPathBt,
+                  Path xiPath,
+                  Path sqPath,
+
+                  Path inputUHatPath,
+                  Path inputSigmaPath,
+
+                  Path outputPath,
+                  int k,
+                  int numReduceTasks,
+                  SSVDSolver.OutputScalingEnum outputScaling) throws 
ClassNotFoundException,
+    InterruptedException, IOException {
+
+    job = new Job(conf);
+    job.setJobName("V-job");
+    job.setJarByClass(VJob.class);
+
+    job.setInputFormatClass(SequenceFileInputFormat.class);
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
+    FileInputFormat.setInputPaths(job, inputPathBt);
+    FileOutputFormat.setOutputPath(job, outputPath);
+
+    // Warn: tight hadoop integration here:
+    job.getConfiguration().set("mapreduce.output.basename", OUTPUT_V);
+    FileOutputFormat.setCompressOutput(job, true);
+    FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);
+    SequenceFileOutputFormat.setOutputCompressionType(job,
+                                                      CompressionType.BLOCK);
+
+    job.setMapOutputKeyClass(IntWritable.class);
+    job.setMapOutputValueClass(VectorWritable.class);
+
+    job.setOutputKeyClass(IntWritable.class);
+    job.setOutputValueClass(VectorWritable.class);
+
+    job.setMapperClass(VMapper.class);
+
+    job.getConfiguration().set(PROP_UHAT_PATH, inputUHatPath.toString());
+    job.getConfiguration().set(PROP_SIGMA_PATH, inputSigmaPath.toString());
+    job.getConfiguration().set(PROP_OUTPUT_SCALING, outputScaling.name());
+    job.getConfiguration().setInt(PROP_K, k);
+    job.setNumReduceTasks(0);
+
+    /*
+     * PCA-related options, MAHOUT-817
+     */
+    if (xiPath != null) {
+      job.getConfiguration().set(PROP_XI_PATH, xiPath.toString());
+      job.getConfiguration().set(PROP_SQ_PATH, sqPath.toString());
+    }
+
+    job.submit();
+
+  }
+
+  public void waitForCompletion() throws IOException, ClassNotFoundException,
+    InterruptedException {
+    job.waitForCompletion(false);
+
+    if (!job.isSuccessful()) {
+      throw new IOException("V job unsuccessful.");
+    }
+
+  }
+
+}

Reply via email to