http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java
 
b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java
new file mode 100644
index 0000000..1277bae
--- /dev/null
+++ 
b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java
@@ -0,0 +1,628 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.stochasticsvd;
+
+import org.apache.commons.lang3.Validate;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.lib.MultipleOutputs;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.IOUtils;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
+import 
org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterator;
+import 
org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterator;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.NamedVector;
+import org.apache.mahout.math.UpperTriangular;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.function.Functions;
+import org.apache.mahout.math.function.PlusMult;
+import org.apache.mahout.math.hadoop.stochasticsvd.qr.QRLastStep;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Deque;
+
+/**
+ * Bt job. For details, see working notes in MAHOUT-376.
+ * <p/>
+ * <p/>
+ * Uses hadoop deprecated API wherever new api has not been updated
+ * (MAHOUT-593), hence @SuppressWarning("deprecation").
+ * <p/>
+ * <p/>
+ * This job outputs either Bt in its standard output, or upper triangular
+ * matrices representing BBt partial sums if that's requested . If the latter
+ * mode is enabled, then we accumulate BBt outer product sums in upper
+ * triangular accumulator and output it at the end of the job, thus saving 
space
+ * and BBt job.
+ * <p/>
+ * <p/>
+ * This job also outputs Q and Bt and optionally BBt. Bt is output to standard
+ * job output (part-*) and Q and BBt use named multiple outputs.
+ * <p/>
+ * <p/>
+ */
+@SuppressWarnings("deprecation")
+public final class BtJob {
+
+  public static final String OUTPUT_Q = "Q";
+  public static final String OUTPUT_BT = "part";
+  public static final String OUTPUT_BBT = "bbt";
+  public static final String OUTPUT_SQ = "sq";
+  public static final String OUTPUT_SB = "sb";
+
+  public static final String PROP_QJOB_PATH = "ssvd.QJob.path";
+  public static final String PROP_OUPTUT_BBT_PRODUCTS =
+    "ssvd.BtJob.outputBBtProducts";
+  public static final String PROP_OUTER_PROD_BLOCK_HEIGHT =
+    "ssvd.outerProdBlockHeight";
+  public static final String PROP_RHAT_BROADCAST = "ssvd.rhat.broadcast";
+  public static final String PROP_XI_PATH = "ssvdpca.xi.path";
+  public static final String PROP_NV = "ssvd.nv";
+
+  private BtJob() {
+  }
+
+  public static class BtMapper extends
+    Mapper<Writable, VectorWritable, LongWritable, SparseRowBlockWritable> {
+
+    private QRLastStep qr;
+    private final Deque<Closeable> closeables = new ArrayDeque<>();
+
+    private int blockNum;
+    private MultipleOutputs outputs;
+    private final VectorWritable qRowValue = new VectorWritable();
+    private Vector btRow;
+    private SparseRowBlockAccumulator btCollector;
+    private Context mapContext;
+    private boolean nv;
+
+    // pca stuff
+    private Vector sqAccum;
+    private boolean computeSq;
+
+    /**
+     * We maintain A and QtHat inputs partitioned the same way, so we
+     * essentially are performing map-side merge here of A and QtHats except
+     * QtHat is stored not row-wise but block-wise.
+     */
+    @Override
+    protected void map(Writable key, VectorWritable value, Context context)
+      throws IOException, InterruptedException {
+
+      mapContext = context;
+      // output Bt outer products
+      Vector aRow = value.get();
+
+      Vector qRow = qr.next();
+      int kp = qRow.size();
+
+      // make sure Qs are inheriting A row labels.
+      outputQRow(key, qRow, aRow);
+
+      // MAHOUT-817
+      if (computeSq) {
+        if (sqAccum == null) {
+          sqAccum = new DenseVector(kp);
+        }
+        sqAccum.assign(qRow, Functions.PLUS);
+      }
+
+      if (btRow == null) {
+        btRow = new DenseVector(kp);
+      }
+
+      if (!aRow.isDense()) {
+        for (Vector.Element el : aRow.nonZeroes()) {
+          double mul = el.get();
+          for (int j = 0; j < kp; j++) {
+            btRow.setQuick(j, mul * qRow.getQuick(j));
+          }
+          btCollector.collect((long) el.index(), btRow);
+        }
+      } else {
+        int n = aRow.size();
+        for (int i = 0; i < n; i++) {
+          double mul = aRow.getQuick(i);
+          for (int j = 0; j < kp; j++) {
+            btRow.setQuick(j, mul * qRow.getQuick(j));
+          }
+          btCollector.collect((long) i, btRow);
+        }
+      }
+    }
+
+    @Override
+    protected void setup(Context context) throws IOException,
+      InterruptedException {
+      super.setup(context);
+
+      Configuration conf = context.getConfiguration();
+
+      Path qJobPath = new Path(conf.get(PROP_QJOB_PATH));
+
+      /*
+       * actually this is kind of dangerous because this routine thinks we need
+       * to create file name for our current job and this will use -m- so it's
+       * just serendipity we are calling it from the mapper too as the QJob 
did.
+       */
+      Path qInputPath =
+        new Path(qJobPath, FileOutputFormat.getUniqueFile(context,
+                                                          QJob.OUTPUT_QHAT,
+                                                          ""));
+      blockNum = context.getTaskAttemptID().getTaskID().getId();
+
+      SequenceFileValueIterator<DenseBlockWritable> qhatInput =
+        new SequenceFileValueIterator<>(qInputPath,
+                                                          true,
+                                                          conf);
+      closeables.addFirst(qhatInput);
+
+      /*
+       * read all r files _in order of task ids_, i.e. partitions (aka group
+       * nums).
+       *
+       * Note: if broadcast option is used, this comes from distributed cache
+       * files rather than hdfs path.
+       */
+
+      SequenceFileDirValueIterator<VectorWritable> rhatInput;
+
+      boolean distributedRHat = conf.get(PROP_RHAT_BROADCAST) != null;
+      if (distributedRHat) {
+
+        Path[] rFiles = HadoopUtil.getCachedFiles(conf);
+
+        Validate.notNull(rFiles,
+                         "no RHat files in distributed cache job definition");
+        //TODO: this probably can be replaced w/ local fs makeQualified
+        Configuration lconf = new Configuration();
+        lconf.set("fs.default.name", "file:///");
+
+        rhatInput =
+          new SequenceFileDirValueIterator<>(rFiles,
+                                                           
SSVDHelper.PARTITION_COMPARATOR,
+                                                           true,
+                                                           lconf);
+
+      } else {
+        Path rPath = new Path(qJobPath, QJob.OUTPUT_RHAT + "-*");
+        rhatInput =
+          new SequenceFileDirValueIterator<>(rPath,
+                                                           PathType.GLOB,
+                                                           null,
+                                                           
SSVDHelper.PARTITION_COMPARATOR,
+                                                           true,
+                                                           conf);
+      }
+
+      Validate.isTrue(rhatInput.hasNext(), "Empty R-hat input!");
+
+      closeables.addFirst(rhatInput);
+      outputs = new MultipleOutputs(new JobConf(conf));
+      closeables.addFirst(new 
IOUtils.MultipleOutputsCloseableAdapter(outputs));
+
+      qr = new QRLastStep(qhatInput, rhatInput, blockNum);
+      closeables.addFirst(qr);
+      /*
+       * it's so happens that current QRLastStep's implementation preloads R
+       * sequence into memory in the constructor so it's ok to close rhat input
+       * now.
+       */
+      if (!rhatInput.hasNext()) {
+        closeables.remove(rhatInput);
+        rhatInput.close();
+      }
+
+      OutputCollector<LongWritable, SparseRowBlockWritable> btBlockCollector =
+        new OutputCollector<LongWritable, SparseRowBlockWritable>() {
+
+          @Override
+          public void collect(LongWritable blockKey,
+                              SparseRowBlockWritable block) throws IOException 
{
+            try {
+              mapContext.write(blockKey, block);
+            } catch (InterruptedException exc) {
+              throw new IOException("Interrupted.", exc);
+            }
+          }
+        };
+
+      btCollector =
+        new SparseRowBlockAccumulator(conf.getInt(PROP_OUTER_PROD_BLOCK_HEIGHT,
+                                                  -1), btBlockCollector);
+      closeables.addFirst(btCollector);
+
+      // MAHOUT-817
+      computeSq = conf.get(PROP_XI_PATH) != null;
+
+      // MAHOUT-1067
+      nv = conf.getBoolean(PROP_NV, false);
+
+    }
+
+    @Override
+    protected void cleanup(Context context) throws IOException,
+      InterruptedException {
+      try {
+        if (sqAccum != null) {
+          /*
+           * hack: we will output sq partial sums with index -1 for summation.
+           */
+          SparseRowBlockWritable sbrw = new SparseRowBlockWritable(1);
+          sbrw.plusRow(0, sqAccum);
+          LongWritable lw = new LongWritable(-1);
+          context.write(lw, sbrw);
+        }
+      } finally {
+        IOUtils.close(closeables);
+      }
+    }
+
+    @SuppressWarnings("unchecked")
+    private void outputQRow(Writable key, Vector qRow, Vector aRow) throws 
IOException {
+      if (nv && (aRow instanceof NamedVector)) {
+        qRowValue.set(new NamedVector(qRow, ((NamedVector) aRow).getName()));
+      } else {
+        qRowValue.set(qRow);
+      }
+      outputs.getCollector(OUTPUT_Q, null).collect(key, qRowValue);
+    }
+  }
+
+  public static class OuterProductCombiner
+    extends
+    Reducer<Writable, SparseRowBlockWritable, Writable, 
SparseRowBlockWritable> {
+
+    protected final SparseRowBlockWritable accum = new 
SparseRowBlockWritable();
+    protected final Deque<Closeable> closeables = new ArrayDeque<>();
+    protected int blockHeight;
+
+    @Override
+    protected void setup(Context context) throws IOException,
+      InterruptedException {
+      blockHeight =
+        context.getConfiguration().getInt(PROP_OUTER_PROD_BLOCK_HEIGHT, -1);
+    }
+
+    @Override
+    protected void reduce(Writable key,
+                          Iterable<SparseRowBlockWritable> values,
+                          Context context) throws IOException,
+      InterruptedException {
+      for (SparseRowBlockWritable bw : values) {
+        accum.plusBlock(bw);
+      }
+      context.write(key, accum);
+      accum.clear();
+    }
+
+    @Override
+    protected void cleanup(Context context) throws IOException,
+      InterruptedException {
+
+      IOUtils.close(closeables);
+    }
+  }
+
+  public static class OuterProductReducer
+    extends
+    Reducer<LongWritable, SparseRowBlockWritable, IntWritable, VectorWritable> 
{
+
+    protected final SparseRowBlockWritable accum = new 
SparseRowBlockWritable();
+    protected final Deque<Closeable> closeables = new ArrayDeque<>();
+
+    protected int blockHeight;
+    private boolean outputBBt;
+    private UpperTriangular mBBt;
+    private MultipleOutputs outputs;
+    private final IntWritable btKey = new IntWritable();
+    private final VectorWritable btValue = new VectorWritable();
+
+    // MAHOUT-817
+    private Vector xi;
+    private final PlusMult pmult = new PlusMult(0);
+    private Vector sbAccum;
+
+    @Override
+    protected void setup(Context context) throws IOException,
+      InterruptedException {
+
+      Configuration conf = context.getConfiguration();
+      blockHeight = conf.getInt(PROP_OUTER_PROD_BLOCK_HEIGHT, -1);
+
+      outputBBt = conf.getBoolean(PROP_OUPTUT_BBT_PRODUCTS, false);
+
+      if (outputBBt) {
+        int k = conf.getInt(QJob.PROP_K, -1);
+        int p = conf.getInt(QJob.PROP_P, -1);
+
+        Validate.isTrue(k > 0, "invalid k parameter");
+        Validate.isTrue(p >= 0, "invalid p parameter");
+        mBBt = new UpperTriangular(k + p);
+
+      }
+
+      String xiPathStr = conf.get(PROP_XI_PATH);
+      if (xiPathStr != null) {
+        xi = SSVDHelper.loadAndSumUpVectors(new Path(xiPathStr), conf);
+        if (xi == null) {
+          throw new IOException(String.format("unable to load mean path xi 
from %s.",
+                                              xiPathStr));
+        }
+      }
+
+      if (outputBBt || xi != null) {
+        outputs = new MultipleOutputs(new JobConf(conf));
+        closeables.addFirst(new 
IOUtils.MultipleOutputsCloseableAdapter(outputs));
+      }
+
+    }
+
+    @Override
+    protected void reduce(LongWritable key,
+                          Iterable<SparseRowBlockWritable> values,
+                          Context context) throws IOException,
+      InterruptedException {
+
+      accum.clear();
+      for (SparseRowBlockWritable bw : values) {
+        accum.plusBlock(bw);
+      }
+
+      // MAHOUT-817:
+      if (key.get() == -1L) {
+
+        Vector sq = accum.getRows()[0];
+
+        @SuppressWarnings("unchecked")
+        OutputCollector<IntWritable, VectorWritable> sqOut =
+          outputs.getCollector(OUTPUT_SQ, null);
+
+        sqOut.collect(new IntWritable(0), new VectorWritable(sq));
+        return;
+      }
+
+      /*
+       * at this point, sum of rows should be in accum, so we just generate
+       * outer self product of it and add to BBt accumulator.
+       */
+
+      for (int k = 0; k < accum.getNumRows(); k++) {
+        Vector btRow = accum.getRows()[k];
+        btKey.set((int) (key.get() * blockHeight + accum.getRowIndices()[k]));
+        btValue.set(btRow);
+        context.write(btKey, btValue);
+
+        if (outputBBt) {
+          int kp = mBBt.numRows();
+          // accumulate partial BBt sum
+          for (int i = 0; i < kp; i++) {
+            double vi = btRow.get(i);
+            if (vi != 0.0) {
+              for (int j = i; j < kp; j++) {
+                double vj = btRow.get(j);
+                if (vj != 0.0) {
+                  mBBt.setQuick(i, j, mBBt.getQuick(i, j) + vi * vj);
+                }
+              }
+            }
+          }
+        }
+
+        // MAHOUT-817
+        if (xi != null) {
+          // code defensively against shortened xi
+          int btIndex = btKey.get();
+          double xii = xi.size() > btIndex ? xi.getQuick(btIndex) : 0.0;
+          // compute s_b
+          pmult.setMultiplicator(xii);
+          if (sbAccum == null) {
+            sbAccum = new DenseVector(btRow.size());
+          }
+          sbAccum.assign(btRow, pmult);
+        }
+
+      }
+    }
+
+    @Override
+    protected void cleanup(Context context) throws IOException,
+      InterruptedException {
+
+      // if we output BBt instead of Bt then we need to do it.
+      try {
+        if (outputBBt) {
+
+          @SuppressWarnings("unchecked")
+          OutputCollector<Writable, Writable> collector =
+            outputs.getCollector(OUTPUT_BBT, null);
+
+          collector.collect(new IntWritable(),
+                            new VectorWritable(new 
DenseVector(mBBt.getData())));
+        }
+
+        // MAHOUT-817
+        if (sbAccum != null) {
+          @SuppressWarnings("unchecked")
+          OutputCollector<IntWritable, VectorWritable> collector =
+            outputs.getCollector(OUTPUT_SB, null);
+
+          collector.collect(new IntWritable(), new VectorWritable(sbAccum));
+
+        }
+      } finally {
+        IOUtils.close(closeables);
+      }
+
+    }
+  }
+
+  public static void run(Configuration conf,
+                         Path[] inputPathA,
+                         Path inputPathQJob,
+                         Path xiPath,
+                         Path outputPath,
+                         int minSplitSize,
+                         int k,
+                         int p,
+                         int btBlockHeight,
+                         int numReduceTasks,
+                         boolean broadcast,
+                         Class<? extends Writable> labelClass,
+                         boolean outputBBtProducts)
+    throws ClassNotFoundException, InterruptedException, IOException {
+
+    JobConf oldApiJob = new JobConf(conf);
+
+    MultipleOutputs.addNamedOutput(oldApiJob,
+                                   OUTPUT_Q,
+                                   
org.apache.hadoop.mapred.SequenceFileOutputFormat.class,
+                                   labelClass,
+                                   VectorWritable.class);
+
+    if (outputBBtProducts) {
+      MultipleOutputs.addNamedOutput(oldApiJob,
+                                     OUTPUT_BBT,
+                                     
org.apache.hadoop.mapred.SequenceFileOutputFormat.class,
+                                     IntWritable.class,
+                                     VectorWritable.class);
+      /*
+       * MAHOUT-1067: if we are asked to output BBT products then named vector
+       * names should be propagated to Q too so that UJob could pick them up
+       * from there.
+       */
+      oldApiJob.setBoolean(PROP_NV, true);
+    }
+    if (xiPath != null) {
+      // compute pca -related stuff as well
+      MultipleOutputs.addNamedOutput(oldApiJob,
+                                     OUTPUT_SQ,
+                                     
org.apache.hadoop.mapred.SequenceFileOutputFormat.class,
+                                     IntWritable.class,
+                                     VectorWritable.class);
+      MultipleOutputs.addNamedOutput(oldApiJob,
+                                     OUTPUT_SB,
+                                     
org.apache.hadoop.mapred.SequenceFileOutputFormat.class,
+                                     IntWritable.class,
+                                     VectorWritable.class);
+    }
+
+    /*
+     * HACK: we use old api multiple outputs since they are not available in 
the
+     * new api of either 0.20.2 or 0.20.203 but wrap it into a new api job so 
we
+     * can use new api interfaces.
+     */
+
+    Job job = new Job(oldApiJob);
+    job.setJobName("Bt-job");
+    job.setJarByClass(BtJob.class);
+
+    job.setInputFormatClass(SequenceFileInputFormat.class);
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
+    FileInputFormat.setInputPaths(job, inputPathA);
+    if (minSplitSize > 0) {
+      FileInputFormat.setMinInputSplitSize(job, minSplitSize);
+    }
+    FileOutputFormat.setOutputPath(job, outputPath);
+
+    // WARN: tight hadoop integration here:
+    job.getConfiguration().set("mapreduce.output.basename", OUTPUT_BT);
+
+    FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);
+    SequenceFileOutputFormat.setOutputCompressionType(job,
+                                                      CompressionType.BLOCK);
+
+    job.setMapOutputKeyClass(LongWritable.class);
+    job.setMapOutputValueClass(SparseRowBlockWritable.class);
+
+    job.setOutputKeyClass(IntWritable.class);
+    job.setOutputValueClass(VectorWritable.class);
+
+    job.setMapperClass(BtMapper.class);
+    job.setCombinerClass(OuterProductCombiner.class);
+    job.setReducerClass(OuterProductReducer.class);
+
+    job.getConfiguration().setInt(QJob.PROP_K, k);
+    job.getConfiguration().setInt(QJob.PROP_P, p);
+    job.getConfiguration().set(PROP_QJOB_PATH, inputPathQJob.toString());
+    job.getConfiguration().setBoolean(PROP_OUPTUT_BBT_PRODUCTS,
+                                      outputBBtProducts);
+    job.getConfiguration().setInt(PROP_OUTER_PROD_BLOCK_HEIGHT, btBlockHeight);
+
+    job.setNumReduceTasks(numReduceTasks);
+
+    /*
+     * PCA-related options, MAHOUT-817
+     */
+    if (xiPath != null) {
+      job.getConfiguration().set(PROP_XI_PATH, xiPath.toString());
+    }
+
+    /*
+     * we can broadhast Rhat files since all of them are reuqired by each job,
+     * but not Q files which correspond to splits of A (so each split of A will
+     * require only particular Q file, each time different one).
+     */
+
+    if (broadcast) {
+      job.getConfiguration().set(PROP_RHAT_BROADCAST, "y");
+
+      FileSystem fs = FileSystem.get(inputPathQJob.toUri(), conf);
+      FileStatus[] fstats =
+        fs.globStatus(new Path(inputPathQJob, QJob.OUTPUT_RHAT + "-*"));
+      if (fstats != null) {
+        for (FileStatus fstat : fstats) {
+          /*
+           * new api is not enabled yet in our dependencies at this time, still
+           * using deprecated one
+           */
+          DistributedCache.addCacheFile(fstat.getPath().toUri(),
+                                        job.getConfiguration());
+        }
+      }
+    }
+
+    job.submit();
+    job.waitForCompletion(false);
+
+    if (!job.isSuccessful()) {
+      throw new IOException("Bt job unsuccessful.");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/DenseBlockWritable.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/DenseBlockWritable.java
 
b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/DenseBlockWritable.java
new file mode 100644
index 0000000..6a9b352
--- /dev/null
+++ 
b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/DenseBlockWritable.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.math.hadoop.stochasticsvd;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Ad-hoc substitution for {@link org.apache.mahout.math.MatrixWritable}.
+ * Perhaps more useful for situations with mostly dense data (such as Q-blocks)
+ * but reduces GC by reusing the same block memory between loads and writes.
+ * <p>
+ * 
+ * in case of Q blocks, it doesn't even matter if they this data is dense cause
+ * we need to unpack it into dense for fast access in computations anyway and
+ * even if it is not so dense the block compressor in sequence files will take
+ * care of it for the serialized size.
+ * <p>
+ */
+public class DenseBlockWritable implements Writable {
+  private double[][] block;
+
+  public void setBlock(double[][] block) {
+    this.block = block;
+  }
+
+  public double[][] getBlock() {
+    return block;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    int m = in.readInt();
+    int n = in.readInt();
+    if (block == null) {
+      block = new double[m][0];
+    } else if (block.length != m) {
+      block = Arrays.copyOf(block, m);
+    }
+    for (int i = 0; i < m; i++) {
+      if (block[i] == null || block[i].length != n) {
+        block[i] = new double[n];
+      }
+      for (int j = 0; j < n; j++) {
+        block[i][j] = in.readDouble();
+      }
+
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    int m = block.length;
+    int n = block.length == 0 ? 0 : block[0].length;
+
+    out.writeInt(m);
+    out.writeInt(n);
+    for (double[] aBlock : block) {
+      for (int j = 0; j < n; j++) {
+        out.writeDouble(aBlock[j]);
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/Omega.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/Omega.java
 
b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/Omega.java
new file mode 100644
index 0000000..a5f32ad
--- /dev/null
+++ 
b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/Omega.java
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.stochasticsvd;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.Lists;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.Vector.Element;
+
+/**
+ * simplistic implementation for Omega matrix in Stochastic SVD method
+ */
+public class Omega {
+
+  private static final double UNIFORM_DIVISOR = Math.pow(2.0, 64);
+
+  private final long seed;
+  private final int kp;
+
+  public Omega(long seed, int kp) {
+    this.seed = seed;
+    this.kp = kp;
+  }
+
+  /**
+   * Get omega element at (x,y) uniformly distributed within [-1...1)
+   *
+   * @param row
+   *          omega row
+   * @param column
+   *          omega column
+   */
+  public double getQuick(int row, int column) {
+    long hash = murmur64((long) row << Integer.SIZE | column, 8, seed);
+    return hash / UNIFORM_DIVISOR;
+  }
+
+  /**
+   * compute YRow=ARow*Omega.
+   * 
+   * @param aRow
+   *          row of matrix A (size n)
+   * @param yRow
+   *          row of matrix Y (result) must be pre-allocated to size of (k+p)
+   */
+  @Deprecated
+  public void computeYRow(Vector aRow, double[] yRow) {
+    // assert yRow.length == kp;
+    Arrays.fill(yRow, 0.0);
+    if (aRow.isDense()) {
+      int n = aRow.size();
+      for (int j = 0; j < n; j++) {
+        accumDots(j, aRow.getQuick(j), yRow);
+      }
+    } else {
+      for (Element el : aRow.nonZeroes()) {
+        accumDots(el.index(), el.get(), yRow);
+      }
+    }
+  }
+
+  /**
+   * A version to compute yRow as a sparse vector in case of extremely sparse
+   * matrices
+   * 
+   * @param aRow
+   * @param yRowOut
+   */
+  public void computeYRow(Vector aRow, Vector yRowOut) {
+    yRowOut.assign(0.0);
+    if (aRow.isDense()) {
+      int n = aRow.size();
+      for (int j = 0; j < n; j++) {
+        accumDots(j, aRow.getQuick(j), yRowOut);
+      }
+    } else {
+      for (Element el : aRow.nonZeroes()) {
+        accumDots(el.index(), el.get(), yRowOut);
+      }
+    }
+  }
+
+  /*
+   * computes t(Omega) %*% v in multithreaded fashion
+   */
+  public Vector mutlithreadedTRightMultiply(final Vector v) {
+
+    int nThreads = Runtime.getRuntime().availableProcessors();
+    ExecutorService es =
+      new ThreadPoolExecutor(nThreads,
+                             nThreads,
+                             1,
+                             TimeUnit.SECONDS,
+                             new ArrayBlockingQueue<Runnable>(kp));
+
+    try {
+
+      List<Future<Double>> dotFutures = Lists.newArrayListWithCapacity(kp);
+
+      for (int i = 0; i < kp; i++) {
+        final int index = i;
+
+        Future<Double> dotFuture = es.submit(new Callable<Double>() {
+          @Override
+          public Double call() throws Exception {
+            double result = 0.0;
+            if (v.isDense()) {
+              for (int k = 0; k < v.size(); k++) {
+                // it's ok, this is reentrant
+                result += getQuick(k, index) * v.getQuick(k);
+              }
+
+            } else {
+              for (Element el : v.nonZeroes()) {
+                int k = el.index();
+                result += getQuick(k, index) * el.get();
+              }
+            }
+            return result;
+          }
+        });
+        dotFutures.add(dotFuture);
+      }
+
+      try {
+        Vector res = new DenseVector(kp);
+        for (int i = 0; i < kp; i++) {
+          res.setQuick(i, dotFutures.get(i).get());
+        }
+        return res;
+      } catch (InterruptedException exc) {
+        throw new IllegalStateException("Interrupted", exc);
+      } catch (ExecutionException exc) {
+        if (exc.getCause() instanceof RuntimeException) {
+          throw (RuntimeException) exc.getCause();
+        } else {
+          throw new IllegalStateException(exc.getCause());
+        }
+      }
+
+    } finally {
+      es.shutdown();
+    }
+  }
+
+  protected void accumDots(int aIndex, double aElement, double[] yRow) {
+    for (int i = 0; i < kp; i++) {
+      yRow[i] += getQuick(aIndex, i) * aElement;
+    }
+  }
+
+  protected void accumDots(int aIndex, double aElement, Vector yRow) {
+    for (int i = 0; i < kp; i++) {
+      yRow.setQuick(i, yRow.getQuick(i) + getQuick(aIndex, i) * aElement);
+    }
+  }
+
+  /**
+   * Shortened version for data < 8 bytes packed into {@code len} lowest bytes
+   * of {@code val}.
+   * 
+   * @param val
+   *          the value
+   * @param len
+   *          the length of data packed into this many low bytes of {@code val}
+   * @param seed
+   *          the seed to use
+   * @return murmur hash
+   */
+  public static long murmur64(long val, int len, long seed) {
+
+    // assert len > 0 && len <= 8;
+    long m = 0xc6a4a7935bd1e995L;
+    long h = seed ^ len * m;
+
+    long k = val;
+
+    k *= m;
+    int r = 47;
+    k ^= k >>> r;
+    k *= m;
+
+    h ^= k;
+    h *= m;
+
+    h ^= h >>> r;
+    h *= m;
+    h ^= h >>> r;
+    return h;
+  }
+
+  public static long murmur64(byte[] val, int offset, int len, long seed) {
+
+    long m = 0xc6a4a7935bd1e995L;
+    int r = 47;
+    long h = seed ^ (len * m);
+
+    int lt = len >>> 3;
+    for (int i = 0; i < lt; i++, offset += 8) {
+      long k = 0;
+      for (int j = 0; j < 8; j++) {
+        k <<= 8;
+        k |= val[offset + j] & 0xff;
+      }
+
+      k *= m;
+      k ^= k >>> r;
+      k *= m;
+
+      h ^= k;
+      h *= m;
+    }
+
+    if (offset < len) {
+      long k = 0;
+      while (offset < len) {
+        k <<= 8;
+        k |= val[offset] & 0xff;
+        offset++;
+      }
+      h ^= k;
+      h *= m;
+    }
+
+    h ^= h >>> r;
+    h *= m;
+    h ^= h >>> r;
+    return h;
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/QJob.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/QJob.java
 
b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/QJob.java
new file mode 100644
index 0000000..76dc299
--- /dev/null
+++ 
b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/QJob.java
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.stochasticsvd;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Deque;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.lib.MultipleOutputs;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.mahout.common.IOUtils;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.function.Functions;
+import org.apache.mahout.math.hadoop.stochasticsvd.qr.QRFirstStep;
+
+/**
+ * Compute first level of QHat-transpose blocks.
+ * <P>
+ * 
+ * See Mahout-376 working notes for details.
+ * <P>
+ * 
+ * Uses some of Hadoop deprecated api wherever newer api is not available.
+ * Hence, @SuppressWarnings("deprecation") for imports (MAHOUT-593).
+ * <P>
+ * 
+ */
+@SuppressWarnings("deprecation")
+public final class QJob {
+
+  public static final String PROP_OMEGA_SEED = "ssvd.omegaseed";
+  public static final String PROP_K = QRFirstStep.PROP_K;
+  public static final String PROP_P = QRFirstStep.PROP_P;
+  public static final String PROP_SB_PATH = "ssvdpca.sb.path";
+  public static final String PROP_AROWBLOCK_SIZE =
+    QRFirstStep.PROP_AROWBLOCK_SIZE;
+
+  public static final String OUTPUT_RHAT = "R";
+  public static final String OUTPUT_QHAT = "QHat";
+
+  private QJob() {
+  }
+
+  public static class QMapper
+      extends
+      Mapper<Writable, VectorWritable, SplitPartitionedWritable, 
VectorWritable> {
+
+    private MultipleOutputs outputs;
+    private final Deque<Closeable> closeables = Lists.newLinkedList();
+    private SplitPartitionedWritable qHatKey;
+    private SplitPartitionedWritable rHatKey;
+    private Vector yRow;
+    private Vector sb;
+    private Omega omega;
+    private int kp;
+
+    private QRFirstStep qr;
+
+    @Override
+    protected void setup(Context context) throws IOException,
+      InterruptedException {
+
+      Configuration conf = context.getConfiguration();
+      int k = Integer.parseInt(conf.get(PROP_K));
+      int p = Integer.parseInt(conf.get(PROP_P));
+      kp = k + p;
+      long omegaSeed = Long.parseLong(conf.get(PROP_OMEGA_SEED));
+      omega = new Omega(omegaSeed, k + p);
+
+      String sbPathStr = conf.get(PROP_SB_PATH);
+      if (sbPathStr != null) {
+        sb = SSVDHelper.loadAndSumUpVectors(new Path(sbPathStr), conf);
+        if (sb == null)
+          throw new IOException(String.format("Unable to load s_omega from 
path %s.", sbPathStr));
+      }
+
+      outputs = new MultipleOutputs(new JobConf(conf));
+      closeables.addFirst(new Closeable() {
+        @Override
+        public void close() throws IOException {
+          outputs.close();
+        }
+      });
+
+      qHatKey = new SplitPartitionedWritable(context);
+      rHatKey = new SplitPartitionedWritable(context);
+
+      OutputCollector<Writable, DenseBlockWritable> qhatCollector =
+        new OutputCollector<Writable, DenseBlockWritable>() {
+
+          @Override
+          @SuppressWarnings("unchecked")
+          public void collect(Writable nil, DenseBlockWritable dbw)
+            throws IOException {
+            outputs.getCollector(OUTPUT_QHAT, null).collect(qHatKey, dbw);
+            qHatKey.incrementItemOrdinal();
+          }
+        };
+
+      OutputCollector<Writable, VectorWritable> rhatCollector =
+        new OutputCollector<Writable, VectorWritable>() {
+
+          @Override
+          @SuppressWarnings("unchecked")
+          public void collect(Writable nil, VectorWritable rhat)
+            throws IOException {
+            outputs.getCollector(OUTPUT_RHAT, null).collect(rHatKey, rhat);
+            rHatKey.incrementItemOrdinal();
+          }
+        };
+
+      qr = new QRFirstStep(conf, qhatCollector, rhatCollector);
+      closeables.addFirst(qr); // important: qr closes first!!
+      yRow = new DenseVector(kp);
+    }
+
+    @Override
+    protected void map(Writable key, VectorWritable value, Context context)
+      throws IOException, InterruptedException {
+      omega.computeYRow(value.get(), yRow);
+      if (sb != null) {
+        yRow.assign(sb, Functions.MINUS);
+      }
+      qr.collect(key, yRow);
+    }
+
+    @Override
+    protected void cleanup(Context context) throws IOException,
+      InterruptedException {
+      IOUtils.close(closeables);
+    }
+  }
+
+  public static void run(Configuration conf,
+                         Path[] inputPaths,
+                         Path sbPath,
+                         Path outputPath,
+                         int aBlockRows,
+                         int minSplitSize,
+                         int k,
+                         int p,
+                         long seed,
+                         int numReduceTasks) throws ClassNotFoundException,
+    InterruptedException, IOException {
+
+    JobConf oldApiJob = new JobConf(conf);
+    MultipleOutputs.addNamedOutput(oldApiJob,
+                                   OUTPUT_QHAT,
+                                   
org.apache.hadoop.mapred.SequenceFileOutputFormat.class,
+                                   SplitPartitionedWritable.class,
+                                   DenseBlockWritable.class);
+    MultipleOutputs.addNamedOutput(oldApiJob,
+                                   OUTPUT_RHAT,
+                                   
org.apache.hadoop.mapred.SequenceFileOutputFormat.class,
+                                   SplitPartitionedWritable.class,
+                                   VectorWritable.class);
+
+    Job job = new Job(oldApiJob);
+    job.setJobName("Q-job");
+    job.setJarByClass(QJob.class);
+
+    job.setInputFormatClass(SequenceFileInputFormat.class);
+    FileInputFormat.setInputPaths(job, inputPaths);
+    if (minSplitSize > 0) {
+      FileInputFormat.setMinInputSplitSize(job, minSplitSize);
+    }
+
+    FileOutputFormat.setOutputPath(job, outputPath);
+
+    FileOutputFormat.setCompressOutput(job, true);
+    FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);
+    SequenceFileOutputFormat.setOutputCompressionType(job,
+                                                      CompressionType.BLOCK);
+
+    job.setMapOutputKeyClass(SplitPartitionedWritable.class);
+    job.setMapOutputValueClass(VectorWritable.class);
+
+    job.setOutputKeyClass(SplitPartitionedWritable.class);
+    job.setOutputValueClass(VectorWritable.class);
+
+    job.setMapperClass(QMapper.class);
+
+    job.getConfiguration().setInt(PROP_AROWBLOCK_SIZE, aBlockRows);
+    job.getConfiguration().setLong(PROP_OMEGA_SEED, seed);
+    job.getConfiguration().setInt(PROP_K, k);
+    job.getConfiguration().setInt(PROP_P, p);
+    if (sbPath != null) {
+      job.getConfiguration().set(PROP_SB_PATH, sbPath.toString());
+    }
+
+    /*
+     * number of reduce tasks doesn't matter. we don't actually send anything 
to
+     * reducers.
+     */
+
+    job.setNumReduceTasks(0 /* numReduceTasks */);
+
+    job.submit();
+    job.waitForCompletion(false);
+
+    if (!job.isSuccessful()) {
+      throw new IOException("Q job unsuccessful.");
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDCli.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDCli.java
 
b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDCli.java
new file mode 100644
index 0000000..7b4fefb
--- /dev/null
+++ 
b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDCli.java
@@ -0,0 +1,201 @@
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.stochasticsvd;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.hadoop.MatrixColumnMeansJob;
+
+/**
+ * Mahout CLI adapter for SSVDSolver
+ */
+public class SSVDCli extends AbstractJob {
+
+  @Override
+  public int run(String[] args) throws Exception {
+    addInputOption();
+    addOutputOption();
+    addOption("rank", "k", "decomposition rank", true);
+    addOption("oversampling", "p", "oversampling", String.valueOf(15));
+    addOption("blockHeight",
+              "r",
+              "Y block height (must be > (k+p))",
+              String.valueOf(10000));
+    addOption("outerProdBlockHeight",
+              "oh",
+              "block height of outer products during multiplication, increase 
for sparse inputs",
+              String.valueOf(30000));
+    addOption("abtBlockHeight",
+              "abth",
+              "block height of Y_i in ABtJob during AB' multiplication, 
increase for extremely sparse inputs",
+              String.valueOf(200000));
+    addOption("minSplitSize", "s", "minimum split size", String.valueOf(-1));
+    addOption("computeU", "U", "compute U (true/false)", String.valueOf(true));
+    addOption("uHalfSigma",
+              "uhs",
+              "Compute U * Sigma^0.5",
+              String.valueOf(false));
+    addOption("uSigma", "us", "Compute U * Sigma", String.valueOf(false));
+    addOption("computeV", "V", "compute V (true/false)", String.valueOf(true));
+    addOption("vHalfSigma",
+              "vhs",
+              "compute V * Sigma^0.5",
+              String.valueOf(false));
+    addOption("reduceTasks",
+              "t",
+              "number of reduce tasks (where applicable)",
+              true);
+    addOption("powerIter",
+              "q",
+              "number of additional power iterations (0..2 is good)",
+              String.valueOf(0));
+    addOption("broadcast",
+              "br",
+              "whether use distributed cache to broadcast matrices wherever 
possible",
+              String.valueOf(true));
+    addOption("pca",
+              "pca",
+              "run in pca mode: compute column-wise mean and subtract from 
input",
+              String.valueOf(false));
+    addOption("pcaOffset",
+              "xi",
+              "path(glob) of external pca mean (optional, dont compute, use 
external mean");
+    addOption(DefaultOptionCreator.overwriteOption().create());
+
+    Map<String, List<String>> pargs = parseArguments(args);
+    if (pargs == null) {
+      return -1;
+    }
+
+    int k = Integer.parseInt(getOption("rank"));
+    int p = Integer.parseInt(getOption("oversampling"));
+    int r = Integer.parseInt(getOption("blockHeight"));
+    int h = Integer.parseInt(getOption("outerProdBlockHeight"));
+    int abh = Integer.parseInt(getOption("abtBlockHeight"));
+    int q = Integer.parseInt(getOption("powerIter"));
+    int minSplitSize = Integer.parseInt(getOption("minSplitSize"));
+    boolean computeU = Boolean.parseBoolean(getOption("computeU"));
+    boolean computeV = Boolean.parseBoolean(getOption("computeV"));
+    boolean cUHalfSigma = Boolean.parseBoolean(getOption("uHalfSigma"));
+    boolean cUSigma = Boolean.parseBoolean(getOption("uSigma"));
+    boolean cVHalfSigma = Boolean.parseBoolean(getOption("vHalfSigma"));
+    int reduceTasks = Integer.parseInt(getOption("reduceTasks"));
+    boolean broadcast = Boolean.parseBoolean(getOption("broadcast"));
+    String xiPathStr = getOption("pcaOffset");
+    Path xiPath = xiPathStr == null ? null : new Path(xiPathStr);
+    boolean pca = Boolean.parseBoolean(getOption("pca")) || xiPath != null;
+
+    boolean overwrite = hasOption(DefaultOptionCreator.OVERWRITE_OPTION);
+
+    Configuration conf = getConf();
+    if (conf == null) {
+      throw new IOException("No Hadoop configuration present");
+    }
+
+    Path[] inputPaths = { getInputPath() };
+    Path tempPath = getTempPath();
+    FileSystem fs = FileSystem.get(getTempPath().toUri(), conf);
+
+    // housekeeping
+    if (overwrite) {
+      // clear the output path
+      HadoopUtil.delete(getConf(), getOutputPath());
+      // clear the temp path
+      HadoopUtil.delete(getConf(), getTempPath());
+    }
+
+    fs.mkdirs(getOutputPath());
+
+    // MAHOUT-817
+    if (pca && xiPath == null) {
+      xiPath = new Path(tempPath, "xi");
+      if (overwrite) {
+        fs.delete(xiPath, true);
+      }
+      MatrixColumnMeansJob.run(conf, inputPaths[0], xiPath);
+    }
+
+    SSVDSolver solver =
+      new SSVDSolver(conf,
+                     inputPaths,
+                     new Path(tempPath, "ssvd"),
+                     r,
+                     k,
+                     p,
+                     reduceTasks);
+
+    solver.setMinSplitSize(minSplitSize);
+    solver.setComputeU(computeU);
+    solver.setComputeV(computeV);
+    solver.setcUHalfSigma(cUHalfSigma);
+    solver.setcVHalfSigma(cVHalfSigma);
+    solver.setcUSigma(cUSigma);
+    solver.setOuterBlockHeight(h);
+    solver.setAbtBlockHeight(abh);
+    solver.setQ(q);
+    solver.setBroadcast(broadcast);
+    solver.setOverwrite(overwrite);
+
+    if (xiPath != null) {
+      solver.setPcaMeanPath(new Path(xiPath, "part-*"));
+    }
+
+    solver.run();
+
+    Vector svalues = solver.getSingularValues().viewPart(0, k);
+    SSVDHelper.saveVector(svalues, getOutputPath("sigma"), conf);
+
+    if (computeU && !fs.rename(new Path(solver.getUPath()), getOutputPath())) {
+      throw new IOException("Unable to move U results to the output path.");
+    }
+    if (cUHalfSigma
+        && !fs.rename(new Path(solver.getuHalfSigmaPath()), getOutputPath())) {
+      throw new IOException("Unable to move U*Sigma^0.5 results to the output 
path.");
+    }
+    if (cUSigma
+        && !fs.rename(new Path(solver.getuSigmaPath()), getOutputPath())) {
+      throw new IOException("Unable to move U*Sigma results to the output 
path.");
+    }
+    if (computeV && !fs.rename(new Path(solver.getVPath()), getOutputPath())) {
+      throw new IOException("Unable to move V results to the output path.");
+    }
+    if (cVHalfSigma
+        && !fs.rename(new Path(solver.getvHalfSigmaPath()), getOutputPath())) {
+      throw new IOException("Unable to move V*Sigma^0.5 results to the output 
path.");
+    }
+
+    // Delete the temp path on exit
+    fs.deleteOnExit(getTempPath());
+
+    return 0;
+  }
+
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new SSVDCli(), args);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDHelper.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDHelper.java
 
b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDHelper.java
new file mode 100644
index 0000000..c585f33
--- /dev/null
+++ 
b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDHelper.java
@@ -0,0 +1,322 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.stochasticsvd;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+import com.google.common.io.Closeables;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.common.IOUtils;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterator;
+import 
org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterator;
+import org.apache.mahout.math.DenseMatrix;
+import org.apache.mahout.math.DenseSymmetricMatrix;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Matrix;
+import org.apache.mahout.math.UpperTriangular;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.function.Functions;
+
+/**
+ * set of small file manipulation helpers.
+ */
+public final class SSVDHelper {
+
+  private static final Pattern OUTPUT_FILE_PATTERN = 
Pattern.compile("(\\w+)-(m|r)-(\\d+)(\\.\\w+)?");
+
+  private SSVDHelper() {
+  }
+
+  /**
+   * load single vector from an hdfs file (possibly presented as glob).
+   */
+  static Vector loadVector(Path glob, Configuration conf) throws IOException {
+
+    SequenceFileDirValueIterator<VectorWritable> iter =
+      new SequenceFileDirValueIterator<>(glob,
+                                                       PathType.GLOB,
+                                                       null,
+                                                       null,
+                                                       true,
+                                                       conf);
+
+    try {
+      if (!iter.hasNext()) {
+        throw new IOException("Empty input while reading vector");
+      }
+      VectorWritable vw = iter.next();
+
+      if (iter.hasNext()) {
+        throw new IOException("Unexpected data after the end of vector file");
+      }
+
+      return vw.get();
+
+    } finally {
+      Closeables.close(iter, true);
+    }
+  }
+
+  /**
+   * save single vector into hdfs file.
+   *
+   * @param v vector to save
+   */
+  public static void saveVector(Vector v,
+                                Path vectorFilePath,
+                                Configuration conf) throws IOException {
+    VectorWritable vw = new VectorWritable(v);
+    FileSystem fs = FileSystem.get(conf);
+    try (SequenceFile.Writer w = new SequenceFile.Writer(fs,
+        conf,
+        vectorFilePath,
+        IntWritable.class,
+        VectorWritable.class)) {
+      w.append(new IntWritable(), vw);
+    }
+      /*
+       * this is a writer, no quiet close please. we must bail out on 
incomplete
+       * close.
+       */
+
+  }
+
+  /**
+   * sniff label type in the input files
+   */
+  static Class<? extends Writable> sniffInputLabelType(Path[] inputPath,
+                                                       Configuration conf)
+    throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+    for (Path p : inputPath) {
+      FileStatus[] fstats = fs.globStatus(p);
+      if (fstats == null || fstats.length == 0) {
+        continue;
+      }
+
+      FileStatus firstSeqFile;
+      if (fstats[0].isDir()) {
+        firstSeqFile = fs.listStatus(fstats[0].getPath(), 
PathFilters.logsCRCFilter())[0];
+      } else {
+        firstSeqFile = fstats[0];
+      }
+
+      SequenceFile.Reader r = null;
+      try {
+        r = new SequenceFile.Reader(fs, firstSeqFile.getPath(), conf);
+        return r.getKeyClass().asSubclass(Writable.class);
+      } finally {
+        Closeables.close(r, true);
+      }
+    }
+    throw new IOException("Unable to open input files to determine input label 
type.");
+  }
+
+  static final Comparator<FileStatus> PARTITION_COMPARATOR =
+    new Comparator<FileStatus>() {
+      private final Matcher matcher = OUTPUT_FILE_PATTERN.matcher("");
+
+      @Override
+      public int compare(FileStatus o1, FileStatus o2) {
+        matcher.reset(o1.getPath().getName());
+        if (!matcher.matches()) {
+          throw new IllegalArgumentException("Unexpected file name, unable to 
deduce partition #:"
+                                               + o1.getPath());
+        }
+        int p1 = Integer.parseInt(matcher.group(3));
+        matcher.reset(o2.getPath().getName());
+        if (!matcher.matches()) {
+          throw new IllegalArgumentException("Unexpected file name, unable to 
deduce partition #:"
+                                               + o2.getPath());
+        }
+
+        int p2 = Integer.parseInt(matcher.group(3));
+        return p1 - p2;
+      }
+
+    };
+
+  public static Iterator<Pair<Writable, Vector>> drmIterator(FileSystem fs, 
Path glob, Configuration conf,
+                                                             Deque<Closeable> 
closeables)
+    throws IOException {
+    SequenceFileDirIterator<Writable, VectorWritable> ret =
+      new SequenceFileDirIterator<>(glob,
+                                                            PathType.GLOB,
+                                                            
PathFilters.logsCRCFilter(),
+                                                            
PARTITION_COMPARATOR,
+                                                            true,
+                                                            conf);
+    closeables.addFirst(ret);
+    return Iterators.transform(ret, new Function<Pair<Writable, 
VectorWritable>, Pair<Writable, Vector>>() {
+      @Override
+      public Pair<Writable, Vector> apply(Pair<Writable, VectorWritable> p) {
+        return new Pair(p.getFirst(), p.getSecond().get());
+      }
+    });
+  }
+
+  /**
+   * helper capabiltiy to load distributed row matrices into dense matrix (to
+   * support tests mainly).
+   *
+   * @param fs   filesystem
+   * @param glob FS glob
+   * @param conf configuration
+   * @return Dense matrix array
+   */
+  public static DenseMatrix drmLoadAsDense(FileSystem fs, Path glob, 
Configuration conf) throws IOException {
+
+    Deque<Closeable> closeables = new ArrayDeque<>();
+    try {
+      List<double[]> denseData = new ArrayList<>();
+      for (Iterator<Pair<Writable, Vector>> iter = drmIterator(fs, glob, conf, 
closeables);
+           iter.hasNext(); ) {
+        Pair<Writable, Vector> p = iter.next();
+        Vector v = p.getSecond();
+        double[] dd = new double[v.size()];
+        if (v.isDense()) {
+          for (int i = 0; i < v.size(); i++) {
+            dd[i] = v.getQuick(i);
+          }
+        } else {
+          for (Vector.Element el : v.nonZeroes()) {
+            dd[el.index()] = el.get();
+          }
+        }
+        denseData.add(dd);
+      }
+      if (denseData.size() == 0) {
+        return null;
+      } else {
+        return new DenseMatrix(denseData.toArray(new 
double[denseData.size()][]));
+      }
+    } finally {
+      IOUtils.close(closeables);
+    }
+  }
+
+  /**
+   * Load multiple upper triangular matrices and sum them up.
+   *
+   * @return the sum of upper triangular inputs.
+   */
+  public static DenseSymmetricMatrix 
loadAndSumUpperTriangularMatricesAsSymmetric(Path glob, Configuration conf) 
throws IOException {
+    Vector v = loadAndSumUpVectors(glob, conf);
+    return v == null ? null : new DenseSymmetricMatrix(v);
+  }
+
+  /**
+   * @return sum of all vectors in different files specified by glob
+   */
+  public static Vector loadAndSumUpVectors(Path glob, Configuration conf)
+    throws IOException {
+
+    SequenceFileDirValueIterator<VectorWritable> iter =
+      new SequenceFileDirValueIterator<>(glob,
+                                                       PathType.GLOB,
+                                                       null,
+                                                       PARTITION_COMPARATOR,
+                                                       true,
+                                                       conf);
+
+    try {
+      Vector v = null;
+      while (iter.hasNext()) {
+        if (v == null) {
+          v = new DenseVector(iter.next().get());
+        } else {
+          v.assign(iter.next().get(), Functions.PLUS);
+        }
+      }
+      return v;
+
+    } finally {
+      Closeables.close(iter, true);
+    }
+
+  }
+
+  /**
+   * Load only one upper triangular matrix and issue error if mroe than one is
+   * found.
+   */
+  public static UpperTriangular loadUpperTriangularMatrix(Path glob, 
Configuration conf) throws IOException {
+
+    /*
+     * there still may be more than one file in glob and only one of them must
+     * contain the matrix.
+     */
+
+    try (SequenceFileDirValueIterator<VectorWritable> iter = new 
SequenceFileDirValueIterator<>(glob,
+        PathType.GLOB,
+        null,
+        null,
+        true,
+        conf)) {
+      if (!iter.hasNext()) {
+        throw new IOException("No triangular matrices found");
+      }
+      Vector v = iter.next().get();
+      UpperTriangular result = new UpperTriangular(v);
+      if (iter.hasNext()) {
+        throw new IOException("Unexpected overrun in upper triangular matrix 
files");
+      }
+      return result;
+
+    }
+  }
+
+  /**
+   * extracts row-wise raw data from a Mahout matrix for 3rd party solvers.
+   * Unfortunately values member is 100% encapsulated in {@link 
org.apache.mahout.math.DenseMatrix} at
+   * this point, so we have to resort to abstract element-wise copying.
+   */
+  public static double[][] extractRawData(Matrix m) {
+    int rows = m.numRows();
+    int cols = m.numCols();
+    double[][] result = new double[rows][];
+    for (int i = 0; i < rows; i++) {
+      result[i] = new double[cols];
+      for (int j = 0; j < cols; j++) {
+        result[i][j] = m.getQuick(i, j);
+      }
+    }
+    return result;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDSolver.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDSolver.java
 
b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDSolver.java
new file mode 100644
index 0000000..94be450
--- /dev/null
+++ 
b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDSolver.java
@@ -0,0 +1,662 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.stochasticsvd;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.common.IOUtils;
+import org.apache.mahout.common.RandomUtils;
+import org.apache.mahout.math.*;
+import org.apache.mahout.math.function.Functions;
+import org.apache.mahout.math.solver.EigenDecomposition;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Deque;
+import java.util.Random;
+
+/**
+ * Stochastic SVD solver (API class).
+ * <p/>
+ * <p/>
+ * Implementation details are in my working notes in MAHOUT-376
+ * (https://issues.apache.org/jira/browse/MAHOUT-376).
+ * <p/>
+ * <p/>
+ * As of the time of this writing, I don't have benchmarks for this method in
+ * comparison to other methods. However, non-hadoop differentiating
+ * characteristics of this method are thought to be :
+ * <LI>"faster" and precision is traded off in favor of speed. However, there's
+ * lever in terms of "oversampling parameter" p. Higher values of p produce
+ * better precision but are trading off speed (and minimum RAM requirement).
+ * This also means that this method is almost guaranteed to be less precise 
than
+ * Lanczos unless full rank SVD decomposition is sought.
+ * <LI>"more scale" -- can presumably take on larger problems than Lanczos one
+ * (not confirmed by benchmark at this time)
+ * <p/>
+ * <p/>
+ * <p/>
+ * Specifically in regards to this implementation, <i>I think</i> couple of
+ * other differentiating points are:
+ * <LI>no need to specify input matrix height or width in command line, it is
+ * what it gets to be.
+ * <LI>supports any Writable as DRM row keys and copies them to correspondent
+ * rows of U matrix;
+ * <LI>can request U or V or U<sub>&sigma;</sub>=U* &Sigma;<sup>0.5</sup> or
+ * V<sub>&sigma;</sub>=V* &Sigma;<sup>0.5</sup> none of which would require 
pass
+ * over input A and these jobs are parallel map-only jobs.
+ * <p/>
+ * <p/>
+ * <p/>
+ * This class is central public API for SSVD solver. The use pattern is as
+ * follows:
+ * <p/>
+ * <UL>
+ * <LI>create the solver using constructor and supplying computation 
parameters.
+ * <LI>set optional parameters thru setter methods.
+ * <LI>call {@link #run()}.
+ * <LI> {@link #getUPath()} (if computed) returns the path to the directory
+ * containing m x k U matrix file(s).
+ * <LI> {@link #getVPath()} (if computed) returns the path to the directory
+ * containing n x k V matrix file(s).
+ * <p/>
+ * </UL>
+ */
+public final class SSVDSolver {
+
+  private Vector svalues;
+  private boolean computeU = true;
+  private boolean computeV = true;
+  private String uPath;
+  private String vPath;
+  private String uSigmaPath;
+  private String uHalfSigmaPath;
+  private String vSigmaPath;
+  private String vHalfSigmaPath;
+  private int outerBlockHeight = 30000;
+  private int abtBlockHeight = 200000;
+
+  // configured stuff
+  private final Configuration conf;
+  private final Path[] inputPath;
+  private final Path outputPath;
+  private final int ablockRows;
+  private final int k;
+  private final int p;
+  private int q;
+  private final int reduceTasks;
+  private int minSplitSize = -1;
+  private boolean cUHalfSigma;
+  private boolean cUSigma;
+  private boolean cVHalfSigma;
+  private boolean cVSigma;
+  private boolean overwrite;
+  private boolean broadcast = true;
+  private Path pcaMeanPath;
+
+  // for debugging
+  private long omegaSeed;
+
+  /**
+   * create new SSVD solver. Required parameters are passed to constructor to
+   * ensure they are set. Optional parameters can be set using setters .
+   * <p/>
+   *
+   * @param conf        hadoop configuration
+   * @param inputPath   Input path (should be compatible with 
DistributedRowMatrix as of
+   *                    the time of this writing).
+   * @param outputPath  Output path containing U, V and singular values vector 
files.
+   * @param ablockRows  The vertical hight of a q-block (bigger value require 
more memory
+   *                    in mappers+ perhaps larger {@code minSplitSize} values
+   * @param k           desired rank
+   * @param p           SSVD oversampling parameter
+   * @param reduceTasks Number of reduce tasks (where applicable)
+   */
+  public SSVDSolver(Configuration conf,
+                    Path[] inputPath,
+                    Path outputPath,
+                    int ablockRows,
+                    int k,
+                    int p,
+                    int reduceTasks) {
+    this.conf = conf;
+    this.inputPath = inputPath;
+    this.outputPath = outputPath;
+    this.ablockRows = ablockRows;
+    this.k = k;
+    this.p = p;
+    this.reduceTasks = reduceTasks;
+  }
+
+  public int getQ() {
+    return q;
+  }
+
+  /**
+   * sets q, amount of additional power iterations to increase precision
+   * (0..2!). Defaults to 0.
+   *
+   * @param q
+   */
+  public void setQ(int q) {
+    this.q = q;
+  }
+
+  /**
+   * The setting controlling whether to compute U matrix of low rank SSVD.
+   * Default true.
+   */
+  public void setComputeU(boolean val) {
+    computeU = val;
+  }
+
+  /**
+   * Setting controlling whether to compute V matrix of low-rank SSVD.
+   *
+   * @param val true if we want to output V matrix. Default is true.
+   */
+  public void setComputeV(boolean val) {
+    computeV = val;
+  }
+
+  /**
+   * @param cUHat whether produce U*Sigma^0.5 as well (default false)
+   */
+  public void setcUHalfSigma(boolean cUHat) {
+    this.cUHalfSigma = cUHat;
+  }
+
+  /**
+   * @param cVHat whether produce V*Sigma^0.5 as well (default false)
+   */
+  public void setcVHalfSigma(boolean cVHat) {
+    this.cVHalfSigma = cVHat;
+  }
+
+  /**
+   * @param cUSigma whether produce U*Sigma output as well (default false)
+   */
+  public void setcUSigma(boolean cUSigma) {
+    this.cUSigma = cUSigma;
+  }
+
+  /**
+   * @param cVSigma whether produce V*Sigma output as well (default false)
+   */
+  public void setcVSigma(boolean cVSigma) {
+    this.cVSigma = cVSigma;
+  }
+
+  /**
+   * Sometimes, if requested A blocks become larger than a split, we may need 
to
+   * use that to ensure at least k+p rows of A get into a split. This is
+   * requirement necessary to obtain orthonormalized Q blocks of SSVD.
+   *
+   * @param size the minimum split size to use
+   */
+  public void setMinSplitSize(int size) {
+    minSplitSize = size;
+  }
+
+  /**
+   * This contains k+p singular values resulted from the solver run.
+   *
+   * @return singlular values (largest to smallest)
+   */
+  public Vector getSingularValues() {
+    return svalues;
+  }
+
+  /**
+   * returns U path (if computation were requested and successful).
+   *
+   * @return U output hdfs path, or null if computation was not completed for
+   *         whatever reason.
+   */
+  public String getUPath() {
+    return uPath;
+  }
+
+  /**
+   * return V path ( if computation was requested and successful ) .
+   *
+   * @return V output hdfs path, or null if computation was not completed for
+   *         whatever reason.
+   */
+  public String getVPath() {
+    return vPath;
+  }
+
+  public String getuSigmaPath() {
+    return uSigmaPath;
+  }
+
+  public String getuHalfSigmaPath() {
+    return uHalfSigmaPath;
+  }
+
+  public String getvSigmaPath() {
+    return vSigmaPath;
+  }
+
+  public String getvHalfSigmaPath() {
+    return vHalfSigmaPath;
+  }
+
+  public boolean isOverwrite() {
+    return overwrite;
+  }
+
+  /**
+   * if true, driver to clean output folder first if exists.
+   *
+   * @param overwrite
+   */
+  public void setOverwrite(boolean overwrite) {
+    this.overwrite = overwrite;
+  }
+
+  public int getOuterBlockHeight() {
+    return outerBlockHeight;
+  }
+
+  /**
+   * The height of outer blocks during Q'A multiplication. Higher values allow
+   * to produce less keys for combining and shuffle and sort therefore somewhat
+   * improving running time; but require larger blocks to be formed in RAM (so
+   * setting this too high can lead to OOM).
+   *
+   * @param outerBlockHeight
+   */
+  public void setOuterBlockHeight(int outerBlockHeight) {
+    this.outerBlockHeight = outerBlockHeight;
+  }
+
+  public int getAbtBlockHeight() {
+    return abtBlockHeight;
+  }
+
+  /**
+   * the block height of Y_i during power iterations. It is probably important
+   * to set it higher than default 200,000 for extremely sparse inputs and when
+   * more ram is available. y_i block height and ABt job would occupy approx.
+   * abtBlockHeight x (k+p) x sizeof (double) (as dense).
+   *
+   * @param abtBlockHeight
+   */
+  public void setAbtBlockHeight(int abtBlockHeight) {
+    this.abtBlockHeight = abtBlockHeight;
+  }
+
+  public boolean isBroadcast() {
+    return broadcast;
+  }
+
+  /**
+   * If this property is true, use DestributedCache mechanism to broadcast some
+   * stuff around. May improve efficiency. Default is false.
+   *
+   * @param broadcast
+   */
+  public void setBroadcast(boolean broadcast) {
+    this.broadcast = broadcast;
+  }
+
+  /**
+   * Optional. Single-vector file path for a vector (aka xi in MAHOUT-817
+   * working notes) to be subtracted from each row of input.
+   * <p/>
+   * <p/>
+   * Brute force approach would force would turn input into a dense input, 
which
+   * is often not very desirable. By supplying this offset to SSVD solver, we
+   * can avoid most of that overhead due to increased input density.
+   * <p/>
+   * <p/>
+   * The vector size for this offest is n (width of A input). In PCA and R this
+   * is known as "column means", but in this case it can be any offset of row
+   * vectors of course to propagate into SSVD solution.
+   * <p/>
+   */
+  public Path getPcaMeanPath() {
+    return pcaMeanPath;
+  }
+
+  public void setPcaMeanPath(Path pcaMeanPath) {
+    this.pcaMeanPath = pcaMeanPath;
+  }
+
+  long getOmegaSeed() {
+    return omegaSeed;
+  }
+
+  /**
+   * run all SSVD jobs.
+   *
+   * @throws IOException if I/O condition occurs.
+   */
+  public void run() throws IOException {
+
+    Deque<Closeable> closeables = Lists.newLinkedList();
+    try {
+      Class<? extends Writable> labelType =
+        SSVDHelper.sniffInputLabelType(inputPath, conf);
+      FileSystem fs = FileSystem.get(conf);
+
+      Path qPath = new Path(outputPath, "Q-job");
+      Path btPath = new Path(outputPath, "Bt-job");
+      Path uHatPath = new Path(outputPath, "UHat");
+      Path svPath = new Path(outputPath, "Sigma");
+      Path uPath = new Path(outputPath, "U");
+      Path uSigmaPath = new Path(outputPath, "USigma");
+      Path uHalfSigmaPath = new Path(outputPath, "UHalfSigma");
+      Path vPath = new Path(outputPath, "V");
+      Path vHalfSigmaPath = new Path(outputPath, "VHalfSigma");
+      Path vSigmaPath = new Path(outputPath, "VSigma");
+
+      Path pcaBasePath = new Path(outputPath, "pca");
+
+      if (overwrite) {
+        fs.delete(outputPath, true);
+      }
+
+      if (pcaMeanPath != null) {
+        fs.mkdirs(pcaBasePath);
+      }
+      Random rnd = RandomUtils.getRandom();
+      omegaSeed = rnd.nextLong();
+
+      Path sbPath = null;
+      double xisquaredlen = 0.0;
+      if (pcaMeanPath != null) {
+        /*
+         * combute s_b0 if pca offset present.
+         * 
+         * Just in case, we treat xi path as a possible reduce or otherwise
+         * multiple task output that we assume we need to sum up partial
+         * components. If it is just one file, it will work too.
+         */
+
+        Vector xi = SSVDHelper.loadAndSumUpVectors(pcaMeanPath, conf);
+        if (xi == null) {
+          throw new IOException(String.format("unable to load mean path xi 
from %s.",
+                                              pcaMeanPath.toString()));
+        }
+
+        xisquaredlen = xi.dot(xi);
+        Omega omega = new Omega(omegaSeed, k + p);
+        Vector s_b0 = omega.mutlithreadedTRightMultiply(xi);
+
+        SSVDHelper.saveVector(s_b0, sbPath = new Path(pcaBasePath, 
"somega.seq"), conf);
+      }
+
+      /*
+       * if we work with pca offset, we need to precompute s_bq0 aka s_omega 
for
+       * jobs to use.
+       */
+
+      QJob.run(conf,
+               inputPath,
+               sbPath,
+               qPath,
+               ablockRows,
+               minSplitSize,
+               k,
+               p,
+               omegaSeed,
+               reduceTasks);
+
+      /*
+       * restrict number of reducers to a reasonable number so we don't have to
+       * run too many additions in the frontend when reconstructing BBt for the
+       * last B' and BB' computations. The user may not realize that and gives 
a
+       * bit too many (I would be happy i that were ever the case though).
+       */
+
+      BtJob.run(conf,
+                inputPath,
+                qPath,
+                pcaMeanPath,
+                btPath,
+                minSplitSize,
+                k,
+                p,
+                outerBlockHeight,
+                q <= 0 ? Math.min(1000, reduceTasks) : reduceTasks,
+                broadcast,
+                labelType,
+                q <= 0);
+
+      sbPath = new Path(btPath, BtJob.OUTPUT_SB + "-*");
+      Path sqPath = new Path(btPath, BtJob.OUTPUT_SQ + "-*");
+
+      // power iterations
+      for (int i = 0; i < q; i++) {
+
+        qPath = new Path(outputPath, String.format("ABt-job-%d", i + 1));
+        Path btPathGlob = new Path(btPath, BtJob.OUTPUT_BT + "-*");
+        ABtDenseOutJob.run(conf,
+                           inputPath,
+                           btPathGlob,
+                           pcaMeanPath,
+                           sqPath,
+                           sbPath,
+                           qPath,
+                           ablockRows,
+                           minSplitSize,
+                           k,
+                           p,
+                           abtBlockHeight,
+                           reduceTasks,
+                           broadcast);
+
+        btPath = new Path(outputPath, String.format("Bt-job-%d", i + 1));
+
+        BtJob.run(conf,
+                  inputPath,
+                  qPath,
+                  pcaMeanPath,
+                  btPath,
+                  minSplitSize,
+                  k,
+                  p,
+                  outerBlockHeight,
+                  i == q - 1 ? Math.min(1000, reduceTasks) : reduceTasks,
+                  broadcast,
+                  labelType,
+                  i == q - 1);
+        sbPath = new Path(btPath, BtJob.OUTPUT_SB + "-*");
+        sqPath = new Path(btPath, BtJob.OUTPUT_SQ + "-*");
+      }
+
+      DenseSymmetricMatrix bbt =
+        SSVDHelper.loadAndSumUpperTriangularMatricesAsSymmetric(new 
Path(btPath,
+                                                                         
BtJob.OUTPUT_BBT
+                                                                           + 
"-*"), conf);
+
+      // convert bbt to something our eigensolver could understand
+      assert bbt.columnSize() == k + p;
+
+      /*
+       * we currently use a 3rd party in-core eigensolver. So we need just a
+       * dense array representation for it.
+       */
+      Matrix bbtSquare = new DenseMatrix(k + p, k + p);
+      bbtSquare.assign(bbt);
+
+      // MAHOUT-817
+      if (pcaMeanPath != null) {
+        Vector sq = SSVDHelper.loadAndSumUpVectors(sqPath, conf);
+        Vector sb = SSVDHelper.loadAndSumUpVectors(sbPath, conf);
+        Matrix mC = sq.cross(sb);
+
+        bbtSquare.assign(mC, Functions.MINUS);
+        bbtSquare.assign(mC.transpose(), Functions.MINUS);
+
+        Matrix outerSq = sq.cross(sq);
+        outerSq.assign(Functions.mult(xisquaredlen));
+        bbtSquare.assign(outerSq, Functions.PLUS);
+
+      }
+
+      EigenDecomposition eigen = new EigenDecomposition(bbtSquare);
+
+      Matrix uHat = eigen.getV();
+      svalues = eigen.getRealEigenvalues().clone();
+
+      svalues.assign(Functions.SQRT);
+
+      // save/redistribute UHat
+      fs.mkdirs(uHatPath);
+      DistributedRowMatrixWriter.write(uHatPath =
+                                         new Path(uHatPath, "uhat.seq"), conf, 
uHat);
+
+      // save sigma.
+      SSVDHelper.saveVector(svalues,
+                            svPath = new Path(svPath, "svalues.seq"),
+                            conf);
+
+      UJob ujob = null;
+      if (computeU) {
+        ujob = new UJob();
+        ujob.run(conf,
+                 new Path(btPath, BtJob.OUTPUT_Q + "-*"),
+                 uHatPath,
+                 svPath,
+                 uPath,
+                 k,
+                 reduceTasks,
+                 labelType,
+                 OutputScalingEnum.NOSCALING);
+        // actually this is map-only job anyway
+      }
+
+      UJob uhsjob = null;
+      if (cUHalfSigma) {
+        uhsjob = new UJob();
+        uhsjob.run(conf,
+                   new Path(btPath, BtJob.OUTPUT_Q + "-*"),
+                   uHatPath,
+                   svPath,
+                   uHalfSigmaPath,
+                   k,
+                   reduceTasks,
+                   labelType,
+                   OutputScalingEnum.HALFSIGMA);
+      }
+
+      UJob usjob = null;
+      if (cUSigma) {
+        usjob = new UJob();
+        usjob.run(conf,
+                  new Path(btPath, BtJob.OUTPUT_Q + "-*"),
+                  uHatPath,
+                  svPath,
+                  uSigmaPath,
+                  k,
+                  reduceTasks,
+                  labelType,
+                  OutputScalingEnum.SIGMA);
+      }
+
+      VJob vjob = null;
+      if (computeV) {
+        vjob = new VJob();
+        vjob.run(conf,
+                 new Path(btPath, BtJob.OUTPUT_BT + "-*"),
+                 pcaMeanPath,
+                 sqPath,
+                 uHatPath,
+                 svPath,
+                 vPath,
+                 k,
+                 reduceTasks,
+                 OutputScalingEnum.NOSCALING);
+      }
+
+      VJob vhsjob = null;
+      if (cVHalfSigma) {
+        vhsjob = new VJob();
+        vhsjob.run(conf,
+                   new Path(btPath, BtJob.OUTPUT_BT + "-*"),
+                   pcaMeanPath,
+                   sqPath,
+                   uHatPath,
+                   svPath,
+                   vHalfSigmaPath,
+                   k,
+                   reduceTasks,
+                   OutputScalingEnum.HALFSIGMA);
+      }
+
+      VJob vsjob = null;
+      if (cVSigma) {
+        vsjob = new VJob();
+        vsjob.run(conf,
+                  new Path(btPath, BtJob.OUTPUT_BT + "-*"),
+                  pcaMeanPath,
+                  sqPath,
+                  uHatPath,
+                  svPath,
+                  vSigmaPath,
+                  k,
+                  reduceTasks,
+                  OutputScalingEnum.SIGMA);
+      }
+
+      if (ujob != null) {
+        ujob.waitForCompletion();
+        this.uPath = uPath.toString();
+      }
+      if (uhsjob != null) {
+        uhsjob.waitForCompletion();
+        this.uHalfSigmaPath = uHalfSigmaPath.toString();
+      }
+      if (usjob != null) {
+        usjob.waitForCompletion();
+        this.uSigmaPath = uSigmaPath.toString();
+      }
+      if (vjob != null) {
+        vjob.waitForCompletion();
+        this.vPath = vPath.toString();
+      }
+      if (vhsjob != null) {
+        vhsjob.waitForCompletion();
+        this.vHalfSigmaPath = vHalfSigmaPath.toString();
+      }
+      if (vsjob != null) {
+        vsjob.waitForCompletion();
+        this.vSigmaPath = vSigmaPath.toString();
+      }
+
+    } catch (InterruptedException exc) {
+      throw new IOException("Interrupted", exc);
+    } catch (ClassNotFoundException exc) {
+      throw new IOException(exc);
+
+    } finally {
+      IOUtils.close(closeables);
+    }
+  }
+
+  enum OutputScalingEnum {
+    NOSCALING, SIGMA, HALFSIGMA
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SparseRowBlockAccumulator.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SparseRowBlockAccumulator.java
 
b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SparseRowBlockAccumulator.java
new file mode 100644
index 0000000..081f55a
--- /dev/null
+++ 
b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SparseRowBlockAccumulator.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.stochasticsvd;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.mahout.math.Vector;
+
+/**
+ * Aggregate incoming rows into blocks based on the row number (long). Rows can
+ * be sparse (meaning they come perhaps in big intervals) and don't even have 
to
+ * come in any order, but they should be coming in proximity, so when we output
+ * block key, we hopefully aggregate more than one row by then.
+ * <P>
+ * 
+ * If block is sufficiently large to fit all rows that mapper may produce, it
+ * will not even ever hit a spill at all as we would already be plussing
+ * efficiently in the mapper.
+ * <P>
+ * 
+ * Also, for sparse inputs it will also be working especially well if 
transposed
+ * columns of the left side matrix and corresponding rows of the right side
+ * matrix experience sparsity in same elements.
+ * <P>
+ * 
+ */
+public class SparseRowBlockAccumulator implements
+    OutputCollector<Long, Vector>, Closeable {
+
+  private final int height;
+  private final OutputCollector<LongWritable, SparseRowBlockWritable> delegate;
+  private long currentBlockNum = -1;
+  private SparseRowBlockWritable block;
+  private final LongWritable blockKeyW = new LongWritable();
+
+  public SparseRowBlockAccumulator(int height,
+                                   OutputCollector<LongWritable, 
SparseRowBlockWritable> delegate) {
+    this.height = height;
+    this.delegate = delegate;
+  }
+
+  private void flushBlock() throws IOException {
+    if (block == null || block.getNumRows() == 0) {
+      return;
+    }
+    blockKeyW.set(currentBlockNum);
+    delegate.collect(blockKeyW, block);
+    block.clear();
+  }
+
+  @Override
+  public void collect(Long rowIndex, Vector v) throws IOException {
+
+    long blockKey = rowIndex / height;
+
+    if (blockKey != currentBlockNum) {
+      flushBlock();
+      if (block == null) {
+        block = new SparseRowBlockWritable(100);
+      }
+      currentBlockNum = blockKey;
+    }
+
+    block.plusRow((int) (rowIndex % height), v);
+  }
+
+  @Override
+  public void close() throws IOException {
+    flushBlock();
+  }
+
+}

Reply via email to