http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java new file mode 100644 index 0000000..1277bae --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java @@ -0,0 +1,628 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.hadoop.stochasticsvd; + +import org.apache.commons.lang3.Validate; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.filecache.DistributedCache; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.compress.DefaultCodec; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.lib.MultipleOutputs; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.mahout.common.HadoopUtil; +import org.apache.mahout.common.IOUtils; +import org.apache.mahout.common.iterator.sequencefile.PathType; +import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterator; +import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterator; +import org.apache.mahout.math.DenseVector; +import org.apache.mahout.math.NamedVector; +import org.apache.mahout.math.UpperTriangular; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; +import org.apache.mahout.math.function.Functions; +import org.apache.mahout.math.function.PlusMult; +import org.apache.mahout.math.hadoop.stochasticsvd.qr.QRLastStep; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Deque; + +/** + * Bt job. For details, see working notes in MAHOUT-376. + * <p/> + * <p/> + * Uses hadoop deprecated API wherever new api has not been updated + * (MAHOUT-593), hence @SuppressWarning("deprecation"). + * <p/> + * <p/> + * This job outputs either Bt in its standard output, or upper triangular + * matrices representing BBt partial sums if that's requested . If the latter + * mode is enabled, then we accumulate BBt outer product sums in upper + * triangular accumulator and output it at the end of the job, thus saving space + * and BBt job. + * <p/> + * <p/> + * This job also outputs Q and Bt and optionally BBt. Bt is output to standard + * job output (part-*) and Q and BBt use named multiple outputs. + * <p/> + * <p/> + */ +@SuppressWarnings("deprecation") +public final class BtJob { + + public static final String OUTPUT_Q = "Q"; + public static final String OUTPUT_BT = "part"; + public static final String OUTPUT_BBT = "bbt"; + public static final String OUTPUT_SQ = "sq"; + public static final String OUTPUT_SB = "sb"; + + public static final String PROP_QJOB_PATH = "ssvd.QJob.path"; + public static final String PROP_OUPTUT_BBT_PRODUCTS = + "ssvd.BtJob.outputBBtProducts"; + public static final String PROP_OUTER_PROD_BLOCK_HEIGHT = + "ssvd.outerProdBlockHeight"; + public static final String PROP_RHAT_BROADCAST = "ssvd.rhat.broadcast"; + public static final String PROP_XI_PATH = "ssvdpca.xi.path"; + public static final String PROP_NV = "ssvd.nv"; + + private BtJob() { + } + + public static class BtMapper extends + Mapper<Writable, VectorWritable, LongWritable, SparseRowBlockWritable> { + + private QRLastStep qr; + private final Deque<Closeable> closeables = new ArrayDeque<>(); + + private int blockNum; + private MultipleOutputs outputs; + private final VectorWritable qRowValue = new VectorWritable(); + private Vector btRow; + private SparseRowBlockAccumulator btCollector; + private Context mapContext; + private boolean nv; + + // pca stuff + private Vector sqAccum; + private boolean computeSq; + + /** + * We maintain A and QtHat inputs partitioned the same way, so we + * essentially are performing map-side merge here of A and QtHats except + * QtHat is stored not row-wise but block-wise. + */ + @Override + protected void map(Writable key, VectorWritable value, Context context) + throws IOException, InterruptedException { + + mapContext = context; + // output Bt outer products + Vector aRow = value.get(); + + Vector qRow = qr.next(); + int kp = qRow.size(); + + // make sure Qs are inheriting A row labels. + outputQRow(key, qRow, aRow); + + // MAHOUT-817 + if (computeSq) { + if (sqAccum == null) { + sqAccum = new DenseVector(kp); + } + sqAccum.assign(qRow, Functions.PLUS); + } + + if (btRow == null) { + btRow = new DenseVector(kp); + } + + if (!aRow.isDense()) { + for (Vector.Element el : aRow.nonZeroes()) { + double mul = el.get(); + for (int j = 0; j < kp; j++) { + btRow.setQuick(j, mul * qRow.getQuick(j)); + } + btCollector.collect((long) el.index(), btRow); + } + } else { + int n = aRow.size(); + for (int i = 0; i < n; i++) { + double mul = aRow.getQuick(i); + for (int j = 0; j < kp; j++) { + btRow.setQuick(j, mul * qRow.getQuick(j)); + } + btCollector.collect((long) i, btRow); + } + } + } + + @Override + protected void setup(Context context) throws IOException, + InterruptedException { + super.setup(context); + + Configuration conf = context.getConfiguration(); + + Path qJobPath = new Path(conf.get(PROP_QJOB_PATH)); + + /* + * actually this is kind of dangerous because this routine thinks we need + * to create file name for our current job and this will use -m- so it's + * just serendipity we are calling it from the mapper too as the QJob did. + */ + Path qInputPath = + new Path(qJobPath, FileOutputFormat.getUniqueFile(context, + QJob.OUTPUT_QHAT, + "")); + blockNum = context.getTaskAttemptID().getTaskID().getId(); + + SequenceFileValueIterator<DenseBlockWritable> qhatInput = + new SequenceFileValueIterator<>(qInputPath, + true, + conf); + closeables.addFirst(qhatInput); + + /* + * read all r files _in order of task ids_, i.e. partitions (aka group + * nums). + * + * Note: if broadcast option is used, this comes from distributed cache + * files rather than hdfs path. + */ + + SequenceFileDirValueIterator<VectorWritable> rhatInput; + + boolean distributedRHat = conf.get(PROP_RHAT_BROADCAST) != null; + if (distributedRHat) { + + Path[] rFiles = HadoopUtil.getCachedFiles(conf); + + Validate.notNull(rFiles, + "no RHat files in distributed cache job definition"); + //TODO: this probably can be replaced w/ local fs makeQualified + Configuration lconf = new Configuration(); + lconf.set("fs.default.name", "file:///"); + + rhatInput = + new SequenceFileDirValueIterator<>(rFiles, + SSVDHelper.PARTITION_COMPARATOR, + true, + lconf); + + } else { + Path rPath = new Path(qJobPath, QJob.OUTPUT_RHAT + "-*"); + rhatInput = + new SequenceFileDirValueIterator<>(rPath, + PathType.GLOB, + null, + SSVDHelper.PARTITION_COMPARATOR, + true, + conf); + } + + Validate.isTrue(rhatInput.hasNext(), "Empty R-hat input!"); + + closeables.addFirst(rhatInput); + outputs = new MultipleOutputs(new JobConf(conf)); + closeables.addFirst(new IOUtils.MultipleOutputsCloseableAdapter(outputs)); + + qr = new QRLastStep(qhatInput, rhatInput, blockNum); + closeables.addFirst(qr); + /* + * it's so happens that current QRLastStep's implementation preloads R + * sequence into memory in the constructor so it's ok to close rhat input + * now. + */ + if (!rhatInput.hasNext()) { + closeables.remove(rhatInput); + rhatInput.close(); + } + + OutputCollector<LongWritable, SparseRowBlockWritable> btBlockCollector = + new OutputCollector<LongWritable, SparseRowBlockWritable>() { + + @Override + public void collect(LongWritable blockKey, + SparseRowBlockWritable block) throws IOException { + try { + mapContext.write(blockKey, block); + } catch (InterruptedException exc) { + throw new IOException("Interrupted.", exc); + } + } + }; + + btCollector = + new SparseRowBlockAccumulator(conf.getInt(PROP_OUTER_PROD_BLOCK_HEIGHT, + -1), btBlockCollector); + closeables.addFirst(btCollector); + + // MAHOUT-817 + computeSq = conf.get(PROP_XI_PATH) != null; + + // MAHOUT-1067 + nv = conf.getBoolean(PROP_NV, false); + + } + + @Override + protected void cleanup(Context context) throws IOException, + InterruptedException { + try { + if (sqAccum != null) { + /* + * hack: we will output sq partial sums with index -1 for summation. + */ + SparseRowBlockWritable sbrw = new SparseRowBlockWritable(1); + sbrw.plusRow(0, sqAccum); + LongWritable lw = new LongWritable(-1); + context.write(lw, sbrw); + } + } finally { + IOUtils.close(closeables); + } + } + + @SuppressWarnings("unchecked") + private void outputQRow(Writable key, Vector qRow, Vector aRow) throws IOException { + if (nv && (aRow instanceof NamedVector)) { + qRowValue.set(new NamedVector(qRow, ((NamedVector) aRow).getName())); + } else { + qRowValue.set(qRow); + } + outputs.getCollector(OUTPUT_Q, null).collect(key, qRowValue); + } + } + + public static class OuterProductCombiner + extends + Reducer<Writable, SparseRowBlockWritable, Writable, SparseRowBlockWritable> { + + protected final SparseRowBlockWritable accum = new SparseRowBlockWritable(); + protected final Deque<Closeable> closeables = new ArrayDeque<>(); + protected int blockHeight; + + @Override + protected void setup(Context context) throws IOException, + InterruptedException { + blockHeight = + context.getConfiguration().getInt(PROP_OUTER_PROD_BLOCK_HEIGHT, -1); + } + + @Override + protected void reduce(Writable key, + Iterable<SparseRowBlockWritable> values, + Context context) throws IOException, + InterruptedException { + for (SparseRowBlockWritable bw : values) { + accum.plusBlock(bw); + } + context.write(key, accum); + accum.clear(); + } + + @Override + protected void cleanup(Context context) throws IOException, + InterruptedException { + + IOUtils.close(closeables); + } + } + + public static class OuterProductReducer + extends + Reducer<LongWritable, SparseRowBlockWritable, IntWritable, VectorWritable> { + + protected final SparseRowBlockWritable accum = new SparseRowBlockWritable(); + protected final Deque<Closeable> closeables = new ArrayDeque<>(); + + protected int blockHeight; + private boolean outputBBt; + private UpperTriangular mBBt; + private MultipleOutputs outputs; + private final IntWritable btKey = new IntWritable(); + private final VectorWritable btValue = new VectorWritable(); + + // MAHOUT-817 + private Vector xi; + private final PlusMult pmult = new PlusMult(0); + private Vector sbAccum; + + @Override + protected void setup(Context context) throws IOException, + InterruptedException { + + Configuration conf = context.getConfiguration(); + blockHeight = conf.getInt(PROP_OUTER_PROD_BLOCK_HEIGHT, -1); + + outputBBt = conf.getBoolean(PROP_OUPTUT_BBT_PRODUCTS, false); + + if (outputBBt) { + int k = conf.getInt(QJob.PROP_K, -1); + int p = conf.getInt(QJob.PROP_P, -1); + + Validate.isTrue(k > 0, "invalid k parameter"); + Validate.isTrue(p >= 0, "invalid p parameter"); + mBBt = new UpperTriangular(k + p); + + } + + String xiPathStr = conf.get(PROP_XI_PATH); + if (xiPathStr != null) { + xi = SSVDHelper.loadAndSumUpVectors(new Path(xiPathStr), conf); + if (xi == null) { + throw new IOException(String.format("unable to load mean path xi from %s.", + xiPathStr)); + } + } + + if (outputBBt || xi != null) { + outputs = new MultipleOutputs(new JobConf(conf)); + closeables.addFirst(new IOUtils.MultipleOutputsCloseableAdapter(outputs)); + } + + } + + @Override + protected void reduce(LongWritable key, + Iterable<SparseRowBlockWritable> values, + Context context) throws IOException, + InterruptedException { + + accum.clear(); + for (SparseRowBlockWritable bw : values) { + accum.plusBlock(bw); + } + + // MAHOUT-817: + if (key.get() == -1L) { + + Vector sq = accum.getRows()[0]; + + @SuppressWarnings("unchecked") + OutputCollector<IntWritable, VectorWritable> sqOut = + outputs.getCollector(OUTPUT_SQ, null); + + sqOut.collect(new IntWritable(0), new VectorWritable(sq)); + return; + } + + /* + * at this point, sum of rows should be in accum, so we just generate + * outer self product of it and add to BBt accumulator. + */ + + for (int k = 0; k < accum.getNumRows(); k++) { + Vector btRow = accum.getRows()[k]; + btKey.set((int) (key.get() * blockHeight + accum.getRowIndices()[k])); + btValue.set(btRow); + context.write(btKey, btValue); + + if (outputBBt) { + int kp = mBBt.numRows(); + // accumulate partial BBt sum + for (int i = 0; i < kp; i++) { + double vi = btRow.get(i); + if (vi != 0.0) { + for (int j = i; j < kp; j++) { + double vj = btRow.get(j); + if (vj != 0.0) { + mBBt.setQuick(i, j, mBBt.getQuick(i, j) + vi * vj); + } + } + } + } + } + + // MAHOUT-817 + if (xi != null) { + // code defensively against shortened xi + int btIndex = btKey.get(); + double xii = xi.size() > btIndex ? xi.getQuick(btIndex) : 0.0; + // compute s_b + pmult.setMultiplicator(xii); + if (sbAccum == null) { + sbAccum = new DenseVector(btRow.size()); + } + sbAccum.assign(btRow, pmult); + } + + } + } + + @Override + protected void cleanup(Context context) throws IOException, + InterruptedException { + + // if we output BBt instead of Bt then we need to do it. + try { + if (outputBBt) { + + @SuppressWarnings("unchecked") + OutputCollector<Writable, Writable> collector = + outputs.getCollector(OUTPUT_BBT, null); + + collector.collect(new IntWritable(), + new VectorWritable(new DenseVector(mBBt.getData()))); + } + + // MAHOUT-817 + if (sbAccum != null) { + @SuppressWarnings("unchecked") + OutputCollector<IntWritable, VectorWritable> collector = + outputs.getCollector(OUTPUT_SB, null); + + collector.collect(new IntWritable(), new VectorWritable(sbAccum)); + + } + } finally { + IOUtils.close(closeables); + } + + } + } + + public static void run(Configuration conf, + Path[] inputPathA, + Path inputPathQJob, + Path xiPath, + Path outputPath, + int minSplitSize, + int k, + int p, + int btBlockHeight, + int numReduceTasks, + boolean broadcast, + Class<? extends Writable> labelClass, + boolean outputBBtProducts) + throws ClassNotFoundException, InterruptedException, IOException { + + JobConf oldApiJob = new JobConf(conf); + + MultipleOutputs.addNamedOutput(oldApiJob, + OUTPUT_Q, + org.apache.hadoop.mapred.SequenceFileOutputFormat.class, + labelClass, + VectorWritable.class); + + if (outputBBtProducts) { + MultipleOutputs.addNamedOutput(oldApiJob, + OUTPUT_BBT, + org.apache.hadoop.mapred.SequenceFileOutputFormat.class, + IntWritable.class, + VectorWritable.class); + /* + * MAHOUT-1067: if we are asked to output BBT products then named vector + * names should be propagated to Q too so that UJob could pick them up + * from there. + */ + oldApiJob.setBoolean(PROP_NV, true); + } + if (xiPath != null) { + // compute pca -related stuff as well + MultipleOutputs.addNamedOutput(oldApiJob, + OUTPUT_SQ, + org.apache.hadoop.mapred.SequenceFileOutputFormat.class, + IntWritable.class, + VectorWritable.class); + MultipleOutputs.addNamedOutput(oldApiJob, + OUTPUT_SB, + org.apache.hadoop.mapred.SequenceFileOutputFormat.class, + IntWritable.class, + VectorWritable.class); + } + + /* + * HACK: we use old api multiple outputs since they are not available in the + * new api of either 0.20.2 or 0.20.203 but wrap it into a new api job so we + * can use new api interfaces. + */ + + Job job = new Job(oldApiJob); + job.setJobName("Bt-job"); + job.setJarByClass(BtJob.class); + + job.setInputFormatClass(SequenceFileInputFormat.class); + job.setOutputFormatClass(SequenceFileOutputFormat.class); + FileInputFormat.setInputPaths(job, inputPathA); + if (minSplitSize > 0) { + FileInputFormat.setMinInputSplitSize(job, minSplitSize); + } + FileOutputFormat.setOutputPath(job, outputPath); + + // WARN: tight hadoop integration here: + job.getConfiguration().set("mapreduce.output.basename", OUTPUT_BT); + + FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class); + SequenceFileOutputFormat.setOutputCompressionType(job, + CompressionType.BLOCK); + + job.setMapOutputKeyClass(LongWritable.class); + job.setMapOutputValueClass(SparseRowBlockWritable.class); + + job.setOutputKeyClass(IntWritable.class); + job.setOutputValueClass(VectorWritable.class); + + job.setMapperClass(BtMapper.class); + job.setCombinerClass(OuterProductCombiner.class); + job.setReducerClass(OuterProductReducer.class); + + job.getConfiguration().setInt(QJob.PROP_K, k); + job.getConfiguration().setInt(QJob.PROP_P, p); + job.getConfiguration().set(PROP_QJOB_PATH, inputPathQJob.toString()); + job.getConfiguration().setBoolean(PROP_OUPTUT_BBT_PRODUCTS, + outputBBtProducts); + job.getConfiguration().setInt(PROP_OUTER_PROD_BLOCK_HEIGHT, btBlockHeight); + + job.setNumReduceTasks(numReduceTasks); + + /* + * PCA-related options, MAHOUT-817 + */ + if (xiPath != null) { + job.getConfiguration().set(PROP_XI_PATH, xiPath.toString()); + } + + /* + * we can broadhast Rhat files since all of them are reuqired by each job, + * but not Q files which correspond to splits of A (so each split of A will + * require only particular Q file, each time different one). + */ + + if (broadcast) { + job.getConfiguration().set(PROP_RHAT_BROADCAST, "y"); + + FileSystem fs = FileSystem.get(inputPathQJob.toUri(), conf); + FileStatus[] fstats = + fs.globStatus(new Path(inputPathQJob, QJob.OUTPUT_RHAT + "-*")); + if (fstats != null) { + for (FileStatus fstat : fstats) { + /* + * new api is not enabled yet in our dependencies at this time, still + * using deprecated one + */ + DistributedCache.addCacheFile(fstat.getPath().toUri(), + job.getConfiguration()); + } + } + } + + job.submit(); + job.waitForCompletion(false); + + if (!job.isSuccessful()) { + throw new IOException("Bt job unsuccessful."); + } + } +}
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/DenseBlockWritable.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/DenseBlockWritable.java b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/DenseBlockWritable.java new file mode 100644 index 0000000..6a9b352 --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/DenseBlockWritable.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.mahout.math.hadoop.stochasticsvd; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Arrays; + +import org.apache.hadoop.io.Writable; + +/** + * Ad-hoc substitution for {@link org.apache.mahout.math.MatrixWritable}. + * Perhaps more useful for situations with mostly dense data (such as Q-blocks) + * but reduces GC by reusing the same block memory between loads and writes. + * <p> + * + * in case of Q blocks, it doesn't even matter if they this data is dense cause + * we need to unpack it into dense for fast access in computations anyway and + * even if it is not so dense the block compressor in sequence files will take + * care of it for the serialized size. + * <p> + */ +public class DenseBlockWritable implements Writable { + private double[][] block; + + public void setBlock(double[][] block) { + this.block = block; + } + + public double[][] getBlock() { + return block; + } + + @Override + public void readFields(DataInput in) throws IOException { + int m = in.readInt(); + int n = in.readInt(); + if (block == null) { + block = new double[m][0]; + } else if (block.length != m) { + block = Arrays.copyOf(block, m); + } + for (int i = 0; i < m; i++) { + if (block[i] == null || block[i].length != n) { + block[i] = new double[n]; + } + for (int j = 0; j < n; j++) { + block[i][j] = in.readDouble(); + } + + } + } + + @Override + public void write(DataOutput out) throws IOException { + int m = block.length; + int n = block.length == 0 ? 0 : block[0].length; + + out.writeInt(m); + out.writeInt(n); + for (double[] aBlock : block) { + for (int j = 0; j < n; j++) { + out.writeDouble(aBlock[j]); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/Omega.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/Omega.java b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/Omega.java new file mode 100644 index 0000000..a5f32ad --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/Omega.java @@ -0,0 +1,257 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.hadoop.stochasticsvd; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import com.google.common.collect.Lists; +import org.apache.mahout.math.DenseVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.Vector.Element; + +/** + * simplistic implementation for Omega matrix in Stochastic SVD method + */ +public class Omega { + + private static final double UNIFORM_DIVISOR = Math.pow(2.0, 64); + + private final long seed; + private final int kp; + + public Omega(long seed, int kp) { + this.seed = seed; + this.kp = kp; + } + + /** + * Get omega element at (x,y) uniformly distributed within [-1...1) + * + * @param row + * omega row + * @param column + * omega column + */ + public double getQuick(int row, int column) { + long hash = murmur64((long) row << Integer.SIZE | column, 8, seed); + return hash / UNIFORM_DIVISOR; + } + + /** + * compute YRow=ARow*Omega. + * + * @param aRow + * row of matrix A (size n) + * @param yRow + * row of matrix Y (result) must be pre-allocated to size of (k+p) + */ + @Deprecated + public void computeYRow(Vector aRow, double[] yRow) { + // assert yRow.length == kp; + Arrays.fill(yRow, 0.0); + if (aRow.isDense()) { + int n = aRow.size(); + for (int j = 0; j < n; j++) { + accumDots(j, aRow.getQuick(j), yRow); + } + } else { + for (Element el : aRow.nonZeroes()) { + accumDots(el.index(), el.get(), yRow); + } + } + } + + /** + * A version to compute yRow as a sparse vector in case of extremely sparse + * matrices + * + * @param aRow + * @param yRowOut + */ + public void computeYRow(Vector aRow, Vector yRowOut) { + yRowOut.assign(0.0); + if (aRow.isDense()) { + int n = aRow.size(); + for (int j = 0; j < n; j++) { + accumDots(j, aRow.getQuick(j), yRowOut); + } + } else { + for (Element el : aRow.nonZeroes()) { + accumDots(el.index(), el.get(), yRowOut); + } + } + } + + /* + * computes t(Omega) %*% v in multithreaded fashion + */ + public Vector mutlithreadedTRightMultiply(final Vector v) { + + int nThreads = Runtime.getRuntime().availableProcessors(); + ExecutorService es = + new ThreadPoolExecutor(nThreads, + nThreads, + 1, + TimeUnit.SECONDS, + new ArrayBlockingQueue<Runnable>(kp)); + + try { + + List<Future<Double>> dotFutures = Lists.newArrayListWithCapacity(kp); + + for (int i = 0; i < kp; i++) { + final int index = i; + + Future<Double> dotFuture = es.submit(new Callable<Double>() { + @Override + public Double call() throws Exception { + double result = 0.0; + if (v.isDense()) { + for (int k = 0; k < v.size(); k++) { + // it's ok, this is reentrant + result += getQuick(k, index) * v.getQuick(k); + } + + } else { + for (Element el : v.nonZeroes()) { + int k = el.index(); + result += getQuick(k, index) * el.get(); + } + } + return result; + } + }); + dotFutures.add(dotFuture); + } + + try { + Vector res = new DenseVector(kp); + for (int i = 0; i < kp; i++) { + res.setQuick(i, dotFutures.get(i).get()); + } + return res; + } catch (InterruptedException exc) { + throw new IllegalStateException("Interrupted", exc); + } catch (ExecutionException exc) { + if (exc.getCause() instanceof RuntimeException) { + throw (RuntimeException) exc.getCause(); + } else { + throw new IllegalStateException(exc.getCause()); + } + } + + } finally { + es.shutdown(); + } + } + + protected void accumDots(int aIndex, double aElement, double[] yRow) { + for (int i = 0; i < kp; i++) { + yRow[i] += getQuick(aIndex, i) * aElement; + } + } + + protected void accumDots(int aIndex, double aElement, Vector yRow) { + for (int i = 0; i < kp; i++) { + yRow.setQuick(i, yRow.getQuick(i) + getQuick(aIndex, i) * aElement); + } + } + + /** + * Shortened version for data < 8 bytes packed into {@code len} lowest bytes + * of {@code val}. + * + * @param val + * the value + * @param len + * the length of data packed into this many low bytes of {@code val} + * @param seed + * the seed to use + * @return murmur hash + */ + public static long murmur64(long val, int len, long seed) { + + // assert len > 0 && len <= 8; + long m = 0xc6a4a7935bd1e995L; + long h = seed ^ len * m; + + long k = val; + + k *= m; + int r = 47; + k ^= k >>> r; + k *= m; + + h ^= k; + h *= m; + + h ^= h >>> r; + h *= m; + h ^= h >>> r; + return h; + } + + public static long murmur64(byte[] val, int offset, int len, long seed) { + + long m = 0xc6a4a7935bd1e995L; + int r = 47; + long h = seed ^ (len * m); + + int lt = len >>> 3; + for (int i = 0; i < lt; i++, offset += 8) { + long k = 0; + for (int j = 0; j < 8; j++) { + k <<= 8; + k |= val[offset + j] & 0xff; + } + + k *= m; + k ^= k >>> r; + k *= m; + + h ^= k; + h *= m; + } + + if (offset < len) { + long k = 0; + while (offset < len) { + k <<= 8; + k |= val[offset] & 0xff; + offset++; + } + h ^= k; + h *= m; + } + + h ^= h >>> r; + h *= m; + h ^= h >>> r; + return h; + + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/QJob.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/QJob.java b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/QJob.java new file mode 100644 index 0000000..76dc299 --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/QJob.java @@ -0,0 +1,237 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.hadoop.stochasticsvd; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Deque; + +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.compress.DefaultCodec; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.lib.MultipleOutputs; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.mahout.common.IOUtils; +import org.apache.mahout.math.DenseVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; +import org.apache.mahout.math.function.Functions; +import org.apache.mahout.math.hadoop.stochasticsvd.qr.QRFirstStep; + +/** + * Compute first level of QHat-transpose blocks. + * <P> + * + * See Mahout-376 working notes for details. + * <P> + * + * Uses some of Hadoop deprecated api wherever newer api is not available. + * Hence, @SuppressWarnings("deprecation") for imports (MAHOUT-593). + * <P> + * + */ +@SuppressWarnings("deprecation") +public final class QJob { + + public static final String PROP_OMEGA_SEED = "ssvd.omegaseed"; + public static final String PROP_K = QRFirstStep.PROP_K; + public static final String PROP_P = QRFirstStep.PROP_P; + public static final String PROP_SB_PATH = "ssvdpca.sb.path"; + public static final String PROP_AROWBLOCK_SIZE = + QRFirstStep.PROP_AROWBLOCK_SIZE; + + public static final String OUTPUT_RHAT = "R"; + public static final String OUTPUT_QHAT = "QHat"; + + private QJob() { + } + + public static class QMapper + extends + Mapper<Writable, VectorWritable, SplitPartitionedWritable, VectorWritable> { + + private MultipleOutputs outputs; + private final Deque<Closeable> closeables = Lists.newLinkedList(); + private SplitPartitionedWritable qHatKey; + private SplitPartitionedWritable rHatKey; + private Vector yRow; + private Vector sb; + private Omega omega; + private int kp; + + private QRFirstStep qr; + + @Override + protected void setup(Context context) throws IOException, + InterruptedException { + + Configuration conf = context.getConfiguration(); + int k = Integer.parseInt(conf.get(PROP_K)); + int p = Integer.parseInt(conf.get(PROP_P)); + kp = k + p; + long omegaSeed = Long.parseLong(conf.get(PROP_OMEGA_SEED)); + omega = new Omega(omegaSeed, k + p); + + String sbPathStr = conf.get(PROP_SB_PATH); + if (sbPathStr != null) { + sb = SSVDHelper.loadAndSumUpVectors(new Path(sbPathStr), conf); + if (sb == null) + throw new IOException(String.format("Unable to load s_omega from path %s.", sbPathStr)); + } + + outputs = new MultipleOutputs(new JobConf(conf)); + closeables.addFirst(new Closeable() { + @Override + public void close() throws IOException { + outputs.close(); + } + }); + + qHatKey = new SplitPartitionedWritable(context); + rHatKey = new SplitPartitionedWritable(context); + + OutputCollector<Writable, DenseBlockWritable> qhatCollector = + new OutputCollector<Writable, DenseBlockWritable>() { + + @Override + @SuppressWarnings("unchecked") + public void collect(Writable nil, DenseBlockWritable dbw) + throws IOException { + outputs.getCollector(OUTPUT_QHAT, null).collect(qHatKey, dbw); + qHatKey.incrementItemOrdinal(); + } + }; + + OutputCollector<Writable, VectorWritable> rhatCollector = + new OutputCollector<Writable, VectorWritable>() { + + @Override + @SuppressWarnings("unchecked") + public void collect(Writable nil, VectorWritable rhat) + throws IOException { + outputs.getCollector(OUTPUT_RHAT, null).collect(rHatKey, rhat); + rHatKey.incrementItemOrdinal(); + } + }; + + qr = new QRFirstStep(conf, qhatCollector, rhatCollector); + closeables.addFirst(qr); // important: qr closes first!! + yRow = new DenseVector(kp); + } + + @Override + protected void map(Writable key, VectorWritable value, Context context) + throws IOException, InterruptedException { + omega.computeYRow(value.get(), yRow); + if (sb != null) { + yRow.assign(sb, Functions.MINUS); + } + qr.collect(key, yRow); + } + + @Override + protected void cleanup(Context context) throws IOException, + InterruptedException { + IOUtils.close(closeables); + } + } + + public static void run(Configuration conf, + Path[] inputPaths, + Path sbPath, + Path outputPath, + int aBlockRows, + int minSplitSize, + int k, + int p, + long seed, + int numReduceTasks) throws ClassNotFoundException, + InterruptedException, IOException { + + JobConf oldApiJob = new JobConf(conf); + MultipleOutputs.addNamedOutput(oldApiJob, + OUTPUT_QHAT, + org.apache.hadoop.mapred.SequenceFileOutputFormat.class, + SplitPartitionedWritable.class, + DenseBlockWritable.class); + MultipleOutputs.addNamedOutput(oldApiJob, + OUTPUT_RHAT, + org.apache.hadoop.mapred.SequenceFileOutputFormat.class, + SplitPartitionedWritable.class, + VectorWritable.class); + + Job job = new Job(oldApiJob); + job.setJobName("Q-job"); + job.setJarByClass(QJob.class); + + job.setInputFormatClass(SequenceFileInputFormat.class); + FileInputFormat.setInputPaths(job, inputPaths); + if (minSplitSize > 0) { + FileInputFormat.setMinInputSplitSize(job, minSplitSize); + } + + FileOutputFormat.setOutputPath(job, outputPath); + + FileOutputFormat.setCompressOutput(job, true); + FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class); + SequenceFileOutputFormat.setOutputCompressionType(job, + CompressionType.BLOCK); + + job.setMapOutputKeyClass(SplitPartitionedWritable.class); + job.setMapOutputValueClass(VectorWritable.class); + + job.setOutputKeyClass(SplitPartitionedWritable.class); + job.setOutputValueClass(VectorWritable.class); + + job.setMapperClass(QMapper.class); + + job.getConfiguration().setInt(PROP_AROWBLOCK_SIZE, aBlockRows); + job.getConfiguration().setLong(PROP_OMEGA_SEED, seed); + job.getConfiguration().setInt(PROP_K, k); + job.getConfiguration().setInt(PROP_P, p); + if (sbPath != null) { + job.getConfiguration().set(PROP_SB_PATH, sbPath.toString()); + } + + /* + * number of reduce tasks doesn't matter. we don't actually send anything to + * reducers. + */ + + job.setNumReduceTasks(0 /* numReduceTasks */); + + job.submit(); + job.waitForCompletion(false); + + if (!job.isSuccessful()) { + throw new IOException("Q job unsuccessful."); + } + + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDCli.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDCli.java b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDCli.java new file mode 100644 index 0000000..7b4fefb --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDCli.java @@ -0,0 +1,201 @@ +/* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.hadoop.stochasticsvd; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.ToolRunner; +import org.apache.mahout.common.AbstractJob; +import org.apache.mahout.common.HadoopUtil; +import org.apache.mahout.common.commandline.DefaultOptionCreator; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.hadoop.MatrixColumnMeansJob; + +/** + * Mahout CLI adapter for SSVDSolver + */ +public class SSVDCli extends AbstractJob { + + @Override + public int run(String[] args) throws Exception { + addInputOption(); + addOutputOption(); + addOption("rank", "k", "decomposition rank", true); + addOption("oversampling", "p", "oversampling", String.valueOf(15)); + addOption("blockHeight", + "r", + "Y block height (must be > (k+p))", + String.valueOf(10000)); + addOption("outerProdBlockHeight", + "oh", + "block height of outer products during multiplication, increase for sparse inputs", + String.valueOf(30000)); + addOption("abtBlockHeight", + "abth", + "block height of Y_i in ABtJob during AB' multiplication, increase for extremely sparse inputs", + String.valueOf(200000)); + addOption("minSplitSize", "s", "minimum split size", String.valueOf(-1)); + addOption("computeU", "U", "compute U (true/false)", String.valueOf(true)); + addOption("uHalfSigma", + "uhs", + "Compute U * Sigma^0.5", + String.valueOf(false)); + addOption("uSigma", "us", "Compute U * Sigma", String.valueOf(false)); + addOption("computeV", "V", "compute V (true/false)", String.valueOf(true)); + addOption("vHalfSigma", + "vhs", + "compute V * Sigma^0.5", + String.valueOf(false)); + addOption("reduceTasks", + "t", + "number of reduce tasks (where applicable)", + true); + addOption("powerIter", + "q", + "number of additional power iterations (0..2 is good)", + String.valueOf(0)); + addOption("broadcast", + "br", + "whether use distributed cache to broadcast matrices wherever possible", + String.valueOf(true)); + addOption("pca", + "pca", + "run in pca mode: compute column-wise mean and subtract from input", + String.valueOf(false)); + addOption("pcaOffset", + "xi", + "path(glob) of external pca mean (optional, dont compute, use external mean"); + addOption(DefaultOptionCreator.overwriteOption().create()); + + Map<String, List<String>> pargs = parseArguments(args); + if (pargs == null) { + return -1; + } + + int k = Integer.parseInt(getOption("rank")); + int p = Integer.parseInt(getOption("oversampling")); + int r = Integer.parseInt(getOption("blockHeight")); + int h = Integer.parseInt(getOption("outerProdBlockHeight")); + int abh = Integer.parseInt(getOption("abtBlockHeight")); + int q = Integer.parseInt(getOption("powerIter")); + int minSplitSize = Integer.parseInt(getOption("minSplitSize")); + boolean computeU = Boolean.parseBoolean(getOption("computeU")); + boolean computeV = Boolean.parseBoolean(getOption("computeV")); + boolean cUHalfSigma = Boolean.parseBoolean(getOption("uHalfSigma")); + boolean cUSigma = Boolean.parseBoolean(getOption("uSigma")); + boolean cVHalfSigma = Boolean.parseBoolean(getOption("vHalfSigma")); + int reduceTasks = Integer.parseInt(getOption("reduceTasks")); + boolean broadcast = Boolean.parseBoolean(getOption("broadcast")); + String xiPathStr = getOption("pcaOffset"); + Path xiPath = xiPathStr == null ? null : new Path(xiPathStr); + boolean pca = Boolean.parseBoolean(getOption("pca")) || xiPath != null; + + boolean overwrite = hasOption(DefaultOptionCreator.OVERWRITE_OPTION); + + Configuration conf = getConf(); + if (conf == null) { + throw new IOException("No Hadoop configuration present"); + } + + Path[] inputPaths = { getInputPath() }; + Path tempPath = getTempPath(); + FileSystem fs = FileSystem.get(getTempPath().toUri(), conf); + + // housekeeping + if (overwrite) { + // clear the output path + HadoopUtil.delete(getConf(), getOutputPath()); + // clear the temp path + HadoopUtil.delete(getConf(), getTempPath()); + } + + fs.mkdirs(getOutputPath()); + + // MAHOUT-817 + if (pca && xiPath == null) { + xiPath = new Path(tempPath, "xi"); + if (overwrite) { + fs.delete(xiPath, true); + } + MatrixColumnMeansJob.run(conf, inputPaths[0], xiPath); + } + + SSVDSolver solver = + new SSVDSolver(conf, + inputPaths, + new Path(tempPath, "ssvd"), + r, + k, + p, + reduceTasks); + + solver.setMinSplitSize(minSplitSize); + solver.setComputeU(computeU); + solver.setComputeV(computeV); + solver.setcUHalfSigma(cUHalfSigma); + solver.setcVHalfSigma(cVHalfSigma); + solver.setcUSigma(cUSigma); + solver.setOuterBlockHeight(h); + solver.setAbtBlockHeight(abh); + solver.setQ(q); + solver.setBroadcast(broadcast); + solver.setOverwrite(overwrite); + + if (xiPath != null) { + solver.setPcaMeanPath(new Path(xiPath, "part-*")); + } + + solver.run(); + + Vector svalues = solver.getSingularValues().viewPart(0, k); + SSVDHelper.saveVector(svalues, getOutputPath("sigma"), conf); + + if (computeU && !fs.rename(new Path(solver.getUPath()), getOutputPath())) { + throw new IOException("Unable to move U results to the output path."); + } + if (cUHalfSigma + && !fs.rename(new Path(solver.getuHalfSigmaPath()), getOutputPath())) { + throw new IOException("Unable to move U*Sigma^0.5 results to the output path."); + } + if (cUSigma + && !fs.rename(new Path(solver.getuSigmaPath()), getOutputPath())) { + throw new IOException("Unable to move U*Sigma results to the output path."); + } + if (computeV && !fs.rename(new Path(solver.getVPath()), getOutputPath())) { + throw new IOException("Unable to move V results to the output path."); + } + if (cVHalfSigma + && !fs.rename(new Path(solver.getvHalfSigmaPath()), getOutputPath())) { + throw new IOException("Unable to move V*Sigma^0.5 results to the output path."); + } + + // Delete the temp path on exit + fs.deleteOnExit(getTempPath()); + + return 0; + } + + public static void main(String[] args) throws Exception { + ToolRunner.run(new SSVDCli(), args); + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDHelper.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDHelper.java b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDHelper.java new file mode 100644 index 0000000..c585f33 --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDHelper.java @@ -0,0 +1,322 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.hadoop.stochasticsvd; + +import com.google.common.base.Function; +import com.google.common.collect.Iterators; +import com.google.common.io.Closeables; +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Deque; +import java.util.Iterator; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; +import org.apache.mahout.common.IOUtils; +import org.apache.mahout.common.Pair; +import org.apache.mahout.common.iterator.sequencefile.PathFilters; +import org.apache.mahout.common.iterator.sequencefile.PathType; +import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterator; +import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterator; +import org.apache.mahout.math.DenseMatrix; +import org.apache.mahout.math.DenseSymmetricMatrix; +import org.apache.mahout.math.DenseVector; +import org.apache.mahout.math.Matrix; +import org.apache.mahout.math.UpperTriangular; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; +import org.apache.mahout.math.function.Functions; + +/** + * set of small file manipulation helpers. + */ +public final class SSVDHelper { + + private static final Pattern OUTPUT_FILE_PATTERN = Pattern.compile("(\\w+)-(m|r)-(\\d+)(\\.\\w+)?"); + + private SSVDHelper() { + } + + /** + * load single vector from an hdfs file (possibly presented as glob). + */ + static Vector loadVector(Path glob, Configuration conf) throws IOException { + + SequenceFileDirValueIterator<VectorWritable> iter = + new SequenceFileDirValueIterator<>(glob, + PathType.GLOB, + null, + null, + true, + conf); + + try { + if (!iter.hasNext()) { + throw new IOException("Empty input while reading vector"); + } + VectorWritable vw = iter.next(); + + if (iter.hasNext()) { + throw new IOException("Unexpected data after the end of vector file"); + } + + return vw.get(); + + } finally { + Closeables.close(iter, true); + } + } + + /** + * save single vector into hdfs file. + * + * @param v vector to save + */ + public static void saveVector(Vector v, + Path vectorFilePath, + Configuration conf) throws IOException { + VectorWritable vw = new VectorWritable(v); + FileSystem fs = FileSystem.get(conf); + try (SequenceFile.Writer w = new SequenceFile.Writer(fs, + conf, + vectorFilePath, + IntWritable.class, + VectorWritable.class)) { + w.append(new IntWritable(), vw); + } + /* + * this is a writer, no quiet close please. we must bail out on incomplete + * close. + */ + + } + + /** + * sniff label type in the input files + */ + static Class<? extends Writable> sniffInputLabelType(Path[] inputPath, + Configuration conf) + throws IOException { + FileSystem fs = FileSystem.get(conf); + for (Path p : inputPath) { + FileStatus[] fstats = fs.globStatus(p); + if (fstats == null || fstats.length == 0) { + continue; + } + + FileStatus firstSeqFile; + if (fstats[0].isDir()) { + firstSeqFile = fs.listStatus(fstats[0].getPath(), PathFilters.logsCRCFilter())[0]; + } else { + firstSeqFile = fstats[0]; + } + + SequenceFile.Reader r = null; + try { + r = new SequenceFile.Reader(fs, firstSeqFile.getPath(), conf); + return r.getKeyClass().asSubclass(Writable.class); + } finally { + Closeables.close(r, true); + } + } + throw new IOException("Unable to open input files to determine input label type."); + } + + static final Comparator<FileStatus> PARTITION_COMPARATOR = + new Comparator<FileStatus>() { + private final Matcher matcher = OUTPUT_FILE_PATTERN.matcher(""); + + @Override + public int compare(FileStatus o1, FileStatus o2) { + matcher.reset(o1.getPath().getName()); + if (!matcher.matches()) { + throw new IllegalArgumentException("Unexpected file name, unable to deduce partition #:" + + o1.getPath()); + } + int p1 = Integer.parseInt(matcher.group(3)); + matcher.reset(o2.getPath().getName()); + if (!matcher.matches()) { + throw new IllegalArgumentException("Unexpected file name, unable to deduce partition #:" + + o2.getPath()); + } + + int p2 = Integer.parseInt(matcher.group(3)); + return p1 - p2; + } + + }; + + public static Iterator<Pair<Writable, Vector>> drmIterator(FileSystem fs, Path glob, Configuration conf, + Deque<Closeable> closeables) + throws IOException { + SequenceFileDirIterator<Writable, VectorWritable> ret = + new SequenceFileDirIterator<>(glob, + PathType.GLOB, + PathFilters.logsCRCFilter(), + PARTITION_COMPARATOR, + true, + conf); + closeables.addFirst(ret); + return Iterators.transform(ret, new Function<Pair<Writable, VectorWritable>, Pair<Writable, Vector>>() { + @Override + public Pair<Writable, Vector> apply(Pair<Writable, VectorWritable> p) { + return new Pair(p.getFirst(), p.getSecond().get()); + } + }); + } + + /** + * helper capabiltiy to load distributed row matrices into dense matrix (to + * support tests mainly). + * + * @param fs filesystem + * @param glob FS glob + * @param conf configuration + * @return Dense matrix array + */ + public static DenseMatrix drmLoadAsDense(FileSystem fs, Path glob, Configuration conf) throws IOException { + + Deque<Closeable> closeables = new ArrayDeque<>(); + try { + List<double[]> denseData = new ArrayList<>(); + for (Iterator<Pair<Writable, Vector>> iter = drmIterator(fs, glob, conf, closeables); + iter.hasNext(); ) { + Pair<Writable, Vector> p = iter.next(); + Vector v = p.getSecond(); + double[] dd = new double[v.size()]; + if (v.isDense()) { + for (int i = 0; i < v.size(); i++) { + dd[i] = v.getQuick(i); + } + } else { + for (Vector.Element el : v.nonZeroes()) { + dd[el.index()] = el.get(); + } + } + denseData.add(dd); + } + if (denseData.size() == 0) { + return null; + } else { + return new DenseMatrix(denseData.toArray(new double[denseData.size()][])); + } + } finally { + IOUtils.close(closeables); + } + } + + /** + * Load multiple upper triangular matrices and sum them up. + * + * @return the sum of upper triangular inputs. + */ + public static DenseSymmetricMatrix loadAndSumUpperTriangularMatricesAsSymmetric(Path glob, Configuration conf) throws IOException { + Vector v = loadAndSumUpVectors(glob, conf); + return v == null ? null : new DenseSymmetricMatrix(v); + } + + /** + * @return sum of all vectors in different files specified by glob + */ + public static Vector loadAndSumUpVectors(Path glob, Configuration conf) + throws IOException { + + SequenceFileDirValueIterator<VectorWritable> iter = + new SequenceFileDirValueIterator<>(glob, + PathType.GLOB, + null, + PARTITION_COMPARATOR, + true, + conf); + + try { + Vector v = null; + while (iter.hasNext()) { + if (v == null) { + v = new DenseVector(iter.next().get()); + } else { + v.assign(iter.next().get(), Functions.PLUS); + } + } + return v; + + } finally { + Closeables.close(iter, true); + } + + } + + /** + * Load only one upper triangular matrix and issue error if mroe than one is + * found. + */ + public static UpperTriangular loadUpperTriangularMatrix(Path glob, Configuration conf) throws IOException { + + /* + * there still may be more than one file in glob and only one of them must + * contain the matrix. + */ + + try (SequenceFileDirValueIterator<VectorWritable> iter = new SequenceFileDirValueIterator<>(glob, + PathType.GLOB, + null, + null, + true, + conf)) { + if (!iter.hasNext()) { + throw new IOException("No triangular matrices found"); + } + Vector v = iter.next().get(); + UpperTriangular result = new UpperTriangular(v); + if (iter.hasNext()) { + throw new IOException("Unexpected overrun in upper triangular matrix files"); + } + return result; + + } + } + + /** + * extracts row-wise raw data from a Mahout matrix for 3rd party solvers. + * Unfortunately values member is 100% encapsulated in {@link org.apache.mahout.math.DenseMatrix} at + * this point, so we have to resort to abstract element-wise copying. + */ + public static double[][] extractRawData(Matrix m) { + int rows = m.numRows(); + int cols = m.numCols(); + double[][] result = new double[rows][]; + for (int i = 0; i < rows; i++) { + result[i] = new double[cols]; + for (int j = 0; j < cols; j++) { + result[i][j] = m.getQuick(i, j); + } + } + return result; + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDSolver.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDSolver.java b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDSolver.java new file mode 100644 index 0000000..94be450 --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDSolver.java @@ -0,0 +1,662 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.hadoop.stochasticsvd; + +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Writable; +import org.apache.mahout.common.IOUtils; +import org.apache.mahout.common.RandomUtils; +import org.apache.mahout.math.*; +import org.apache.mahout.math.function.Functions; +import org.apache.mahout.math.solver.EigenDecomposition; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Deque; +import java.util.Random; + +/** + * Stochastic SVD solver (API class). + * <p/> + * <p/> + * Implementation details are in my working notes in MAHOUT-376 + * (https://issues.apache.org/jira/browse/MAHOUT-376). + * <p/> + * <p/> + * As of the time of this writing, I don't have benchmarks for this method in + * comparison to other methods. However, non-hadoop differentiating + * characteristics of this method are thought to be : + * <LI>"faster" and precision is traded off in favor of speed. However, there's + * lever in terms of "oversampling parameter" p. Higher values of p produce + * better precision but are trading off speed (and minimum RAM requirement). + * This also means that this method is almost guaranteed to be less precise than + * Lanczos unless full rank SVD decomposition is sought. + * <LI>"more scale" -- can presumably take on larger problems than Lanczos one + * (not confirmed by benchmark at this time) + * <p/> + * <p/> + * <p/> + * Specifically in regards to this implementation, <i>I think</i> couple of + * other differentiating points are: + * <LI>no need to specify input matrix height or width in command line, it is + * what it gets to be. + * <LI>supports any Writable as DRM row keys and copies them to correspondent + * rows of U matrix; + * <LI>can request U or V or U<sub>σ</sub>=U* Σ<sup>0.5</sup> or + * V<sub>σ</sub>=V* Σ<sup>0.5</sup> none of which would require pass + * over input A and these jobs are parallel map-only jobs. + * <p/> + * <p/> + * <p/> + * This class is central public API for SSVD solver. The use pattern is as + * follows: + * <p/> + * <UL> + * <LI>create the solver using constructor and supplying computation parameters. + * <LI>set optional parameters thru setter methods. + * <LI>call {@link #run()}. + * <LI> {@link #getUPath()} (if computed) returns the path to the directory + * containing m x k U matrix file(s). + * <LI> {@link #getVPath()} (if computed) returns the path to the directory + * containing n x k V matrix file(s). + * <p/> + * </UL> + */ +public final class SSVDSolver { + + private Vector svalues; + private boolean computeU = true; + private boolean computeV = true; + private String uPath; + private String vPath; + private String uSigmaPath; + private String uHalfSigmaPath; + private String vSigmaPath; + private String vHalfSigmaPath; + private int outerBlockHeight = 30000; + private int abtBlockHeight = 200000; + + // configured stuff + private final Configuration conf; + private final Path[] inputPath; + private final Path outputPath; + private final int ablockRows; + private final int k; + private final int p; + private int q; + private final int reduceTasks; + private int minSplitSize = -1; + private boolean cUHalfSigma; + private boolean cUSigma; + private boolean cVHalfSigma; + private boolean cVSigma; + private boolean overwrite; + private boolean broadcast = true; + private Path pcaMeanPath; + + // for debugging + private long omegaSeed; + + /** + * create new SSVD solver. Required parameters are passed to constructor to + * ensure they are set. Optional parameters can be set using setters . + * <p/> + * + * @param conf hadoop configuration + * @param inputPath Input path (should be compatible with DistributedRowMatrix as of + * the time of this writing). + * @param outputPath Output path containing U, V and singular values vector files. + * @param ablockRows The vertical hight of a q-block (bigger value require more memory + * in mappers+ perhaps larger {@code minSplitSize} values + * @param k desired rank + * @param p SSVD oversampling parameter + * @param reduceTasks Number of reduce tasks (where applicable) + */ + public SSVDSolver(Configuration conf, + Path[] inputPath, + Path outputPath, + int ablockRows, + int k, + int p, + int reduceTasks) { + this.conf = conf; + this.inputPath = inputPath; + this.outputPath = outputPath; + this.ablockRows = ablockRows; + this.k = k; + this.p = p; + this.reduceTasks = reduceTasks; + } + + public int getQ() { + return q; + } + + /** + * sets q, amount of additional power iterations to increase precision + * (0..2!). Defaults to 0. + * + * @param q + */ + public void setQ(int q) { + this.q = q; + } + + /** + * The setting controlling whether to compute U matrix of low rank SSVD. + * Default true. + */ + public void setComputeU(boolean val) { + computeU = val; + } + + /** + * Setting controlling whether to compute V matrix of low-rank SSVD. + * + * @param val true if we want to output V matrix. Default is true. + */ + public void setComputeV(boolean val) { + computeV = val; + } + + /** + * @param cUHat whether produce U*Sigma^0.5 as well (default false) + */ + public void setcUHalfSigma(boolean cUHat) { + this.cUHalfSigma = cUHat; + } + + /** + * @param cVHat whether produce V*Sigma^0.5 as well (default false) + */ + public void setcVHalfSigma(boolean cVHat) { + this.cVHalfSigma = cVHat; + } + + /** + * @param cUSigma whether produce U*Sigma output as well (default false) + */ + public void setcUSigma(boolean cUSigma) { + this.cUSigma = cUSigma; + } + + /** + * @param cVSigma whether produce V*Sigma output as well (default false) + */ + public void setcVSigma(boolean cVSigma) { + this.cVSigma = cVSigma; + } + + /** + * Sometimes, if requested A blocks become larger than a split, we may need to + * use that to ensure at least k+p rows of A get into a split. This is + * requirement necessary to obtain orthonormalized Q blocks of SSVD. + * + * @param size the minimum split size to use + */ + public void setMinSplitSize(int size) { + minSplitSize = size; + } + + /** + * This contains k+p singular values resulted from the solver run. + * + * @return singlular values (largest to smallest) + */ + public Vector getSingularValues() { + return svalues; + } + + /** + * returns U path (if computation were requested and successful). + * + * @return U output hdfs path, or null if computation was not completed for + * whatever reason. + */ + public String getUPath() { + return uPath; + } + + /** + * return V path ( if computation was requested and successful ) . + * + * @return V output hdfs path, or null if computation was not completed for + * whatever reason. + */ + public String getVPath() { + return vPath; + } + + public String getuSigmaPath() { + return uSigmaPath; + } + + public String getuHalfSigmaPath() { + return uHalfSigmaPath; + } + + public String getvSigmaPath() { + return vSigmaPath; + } + + public String getvHalfSigmaPath() { + return vHalfSigmaPath; + } + + public boolean isOverwrite() { + return overwrite; + } + + /** + * if true, driver to clean output folder first if exists. + * + * @param overwrite + */ + public void setOverwrite(boolean overwrite) { + this.overwrite = overwrite; + } + + public int getOuterBlockHeight() { + return outerBlockHeight; + } + + /** + * The height of outer blocks during Q'A multiplication. Higher values allow + * to produce less keys for combining and shuffle and sort therefore somewhat + * improving running time; but require larger blocks to be formed in RAM (so + * setting this too high can lead to OOM). + * + * @param outerBlockHeight + */ + public void setOuterBlockHeight(int outerBlockHeight) { + this.outerBlockHeight = outerBlockHeight; + } + + public int getAbtBlockHeight() { + return abtBlockHeight; + } + + /** + * the block height of Y_i during power iterations. It is probably important + * to set it higher than default 200,000 for extremely sparse inputs and when + * more ram is available. y_i block height and ABt job would occupy approx. + * abtBlockHeight x (k+p) x sizeof (double) (as dense). + * + * @param abtBlockHeight + */ + public void setAbtBlockHeight(int abtBlockHeight) { + this.abtBlockHeight = abtBlockHeight; + } + + public boolean isBroadcast() { + return broadcast; + } + + /** + * If this property is true, use DestributedCache mechanism to broadcast some + * stuff around. May improve efficiency. Default is false. + * + * @param broadcast + */ + public void setBroadcast(boolean broadcast) { + this.broadcast = broadcast; + } + + /** + * Optional. Single-vector file path for a vector (aka xi in MAHOUT-817 + * working notes) to be subtracted from each row of input. + * <p/> + * <p/> + * Brute force approach would force would turn input into a dense input, which + * is often not very desirable. By supplying this offset to SSVD solver, we + * can avoid most of that overhead due to increased input density. + * <p/> + * <p/> + * The vector size for this offest is n (width of A input). In PCA and R this + * is known as "column means", but in this case it can be any offset of row + * vectors of course to propagate into SSVD solution. + * <p/> + */ + public Path getPcaMeanPath() { + return pcaMeanPath; + } + + public void setPcaMeanPath(Path pcaMeanPath) { + this.pcaMeanPath = pcaMeanPath; + } + + long getOmegaSeed() { + return omegaSeed; + } + + /** + * run all SSVD jobs. + * + * @throws IOException if I/O condition occurs. + */ + public void run() throws IOException { + + Deque<Closeable> closeables = Lists.newLinkedList(); + try { + Class<? extends Writable> labelType = + SSVDHelper.sniffInputLabelType(inputPath, conf); + FileSystem fs = FileSystem.get(conf); + + Path qPath = new Path(outputPath, "Q-job"); + Path btPath = new Path(outputPath, "Bt-job"); + Path uHatPath = new Path(outputPath, "UHat"); + Path svPath = new Path(outputPath, "Sigma"); + Path uPath = new Path(outputPath, "U"); + Path uSigmaPath = new Path(outputPath, "USigma"); + Path uHalfSigmaPath = new Path(outputPath, "UHalfSigma"); + Path vPath = new Path(outputPath, "V"); + Path vHalfSigmaPath = new Path(outputPath, "VHalfSigma"); + Path vSigmaPath = new Path(outputPath, "VSigma"); + + Path pcaBasePath = new Path(outputPath, "pca"); + + if (overwrite) { + fs.delete(outputPath, true); + } + + if (pcaMeanPath != null) { + fs.mkdirs(pcaBasePath); + } + Random rnd = RandomUtils.getRandom(); + omegaSeed = rnd.nextLong(); + + Path sbPath = null; + double xisquaredlen = 0.0; + if (pcaMeanPath != null) { + /* + * combute s_b0 if pca offset present. + * + * Just in case, we treat xi path as a possible reduce or otherwise + * multiple task output that we assume we need to sum up partial + * components. If it is just one file, it will work too. + */ + + Vector xi = SSVDHelper.loadAndSumUpVectors(pcaMeanPath, conf); + if (xi == null) { + throw new IOException(String.format("unable to load mean path xi from %s.", + pcaMeanPath.toString())); + } + + xisquaredlen = xi.dot(xi); + Omega omega = new Omega(omegaSeed, k + p); + Vector s_b0 = omega.mutlithreadedTRightMultiply(xi); + + SSVDHelper.saveVector(s_b0, sbPath = new Path(pcaBasePath, "somega.seq"), conf); + } + + /* + * if we work with pca offset, we need to precompute s_bq0 aka s_omega for + * jobs to use. + */ + + QJob.run(conf, + inputPath, + sbPath, + qPath, + ablockRows, + minSplitSize, + k, + p, + omegaSeed, + reduceTasks); + + /* + * restrict number of reducers to a reasonable number so we don't have to + * run too many additions in the frontend when reconstructing BBt for the + * last B' and BB' computations. The user may not realize that and gives a + * bit too many (I would be happy i that were ever the case though). + */ + + BtJob.run(conf, + inputPath, + qPath, + pcaMeanPath, + btPath, + minSplitSize, + k, + p, + outerBlockHeight, + q <= 0 ? Math.min(1000, reduceTasks) : reduceTasks, + broadcast, + labelType, + q <= 0); + + sbPath = new Path(btPath, BtJob.OUTPUT_SB + "-*"); + Path sqPath = new Path(btPath, BtJob.OUTPUT_SQ + "-*"); + + // power iterations + for (int i = 0; i < q; i++) { + + qPath = new Path(outputPath, String.format("ABt-job-%d", i + 1)); + Path btPathGlob = new Path(btPath, BtJob.OUTPUT_BT + "-*"); + ABtDenseOutJob.run(conf, + inputPath, + btPathGlob, + pcaMeanPath, + sqPath, + sbPath, + qPath, + ablockRows, + minSplitSize, + k, + p, + abtBlockHeight, + reduceTasks, + broadcast); + + btPath = new Path(outputPath, String.format("Bt-job-%d", i + 1)); + + BtJob.run(conf, + inputPath, + qPath, + pcaMeanPath, + btPath, + minSplitSize, + k, + p, + outerBlockHeight, + i == q - 1 ? Math.min(1000, reduceTasks) : reduceTasks, + broadcast, + labelType, + i == q - 1); + sbPath = new Path(btPath, BtJob.OUTPUT_SB + "-*"); + sqPath = new Path(btPath, BtJob.OUTPUT_SQ + "-*"); + } + + DenseSymmetricMatrix bbt = + SSVDHelper.loadAndSumUpperTriangularMatricesAsSymmetric(new Path(btPath, + BtJob.OUTPUT_BBT + + "-*"), conf); + + // convert bbt to something our eigensolver could understand + assert bbt.columnSize() == k + p; + + /* + * we currently use a 3rd party in-core eigensolver. So we need just a + * dense array representation for it. + */ + Matrix bbtSquare = new DenseMatrix(k + p, k + p); + bbtSquare.assign(bbt); + + // MAHOUT-817 + if (pcaMeanPath != null) { + Vector sq = SSVDHelper.loadAndSumUpVectors(sqPath, conf); + Vector sb = SSVDHelper.loadAndSumUpVectors(sbPath, conf); + Matrix mC = sq.cross(sb); + + bbtSquare.assign(mC, Functions.MINUS); + bbtSquare.assign(mC.transpose(), Functions.MINUS); + + Matrix outerSq = sq.cross(sq); + outerSq.assign(Functions.mult(xisquaredlen)); + bbtSquare.assign(outerSq, Functions.PLUS); + + } + + EigenDecomposition eigen = new EigenDecomposition(bbtSquare); + + Matrix uHat = eigen.getV(); + svalues = eigen.getRealEigenvalues().clone(); + + svalues.assign(Functions.SQRT); + + // save/redistribute UHat + fs.mkdirs(uHatPath); + DistributedRowMatrixWriter.write(uHatPath = + new Path(uHatPath, "uhat.seq"), conf, uHat); + + // save sigma. + SSVDHelper.saveVector(svalues, + svPath = new Path(svPath, "svalues.seq"), + conf); + + UJob ujob = null; + if (computeU) { + ujob = new UJob(); + ujob.run(conf, + new Path(btPath, BtJob.OUTPUT_Q + "-*"), + uHatPath, + svPath, + uPath, + k, + reduceTasks, + labelType, + OutputScalingEnum.NOSCALING); + // actually this is map-only job anyway + } + + UJob uhsjob = null; + if (cUHalfSigma) { + uhsjob = new UJob(); + uhsjob.run(conf, + new Path(btPath, BtJob.OUTPUT_Q + "-*"), + uHatPath, + svPath, + uHalfSigmaPath, + k, + reduceTasks, + labelType, + OutputScalingEnum.HALFSIGMA); + } + + UJob usjob = null; + if (cUSigma) { + usjob = new UJob(); + usjob.run(conf, + new Path(btPath, BtJob.OUTPUT_Q + "-*"), + uHatPath, + svPath, + uSigmaPath, + k, + reduceTasks, + labelType, + OutputScalingEnum.SIGMA); + } + + VJob vjob = null; + if (computeV) { + vjob = new VJob(); + vjob.run(conf, + new Path(btPath, BtJob.OUTPUT_BT + "-*"), + pcaMeanPath, + sqPath, + uHatPath, + svPath, + vPath, + k, + reduceTasks, + OutputScalingEnum.NOSCALING); + } + + VJob vhsjob = null; + if (cVHalfSigma) { + vhsjob = new VJob(); + vhsjob.run(conf, + new Path(btPath, BtJob.OUTPUT_BT + "-*"), + pcaMeanPath, + sqPath, + uHatPath, + svPath, + vHalfSigmaPath, + k, + reduceTasks, + OutputScalingEnum.HALFSIGMA); + } + + VJob vsjob = null; + if (cVSigma) { + vsjob = new VJob(); + vsjob.run(conf, + new Path(btPath, BtJob.OUTPUT_BT + "-*"), + pcaMeanPath, + sqPath, + uHatPath, + svPath, + vSigmaPath, + k, + reduceTasks, + OutputScalingEnum.SIGMA); + } + + if (ujob != null) { + ujob.waitForCompletion(); + this.uPath = uPath.toString(); + } + if (uhsjob != null) { + uhsjob.waitForCompletion(); + this.uHalfSigmaPath = uHalfSigmaPath.toString(); + } + if (usjob != null) { + usjob.waitForCompletion(); + this.uSigmaPath = uSigmaPath.toString(); + } + if (vjob != null) { + vjob.waitForCompletion(); + this.vPath = vPath.toString(); + } + if (vhsjob != null) { + vhsjob.waitForCompletion(); + this.vHalfSigmaPath = vHalfSigmaPath.toString(); + } + if (vsjob != null) { + vsjob.waitForCompletion(); + this.vSigmaPath = vSigmaPath.toString(); + } + + } catch (InterruptedException exc) { + throw new IOException("Interrupted", exc); + } catch (ClassNotFoundException exc) { + throw new IOException(exc); + + } finally { + IOUtils.close(closeables); + } + } + + enum OutputScalingEnum { + NOSCALING, SIGMA, HALFSIGMA + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SparseRowBlockAccumulator.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SparseRowBlockAccumulator.java b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SparseRowBlockAccumulator.java new file mode 100644 index 0000000..081f55a --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SparseRowBlockAccumulator.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.hadoop.stochasticsvd; + +import java.io.Closeable; +import java.io.IOException; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.mahout.math.Vector; + +/** + * Aggregate incoming rows into blocks based on the row number (long). Rows can + * be sparse (meaning they come perhaps in big intervals) and don't even have to + * come in any order, but they should be coming in proximity, so when we output + * block key, we hopefully aggregate more than one row by then. + * <P> + * + * If block is sufficiently large to fit all rows that mapper may produce, it + * will not even ever hit a spill at all as we would already be plussing + * efficiently in the mapper. + * <P> + * + * Also, for sparse inputs it will also be working especially well if transposed + * columns of the left side matrix and corresponding rows of the right side + * matrix experience sparsity in same elements. + * <P> + * + */ +public class SparseRowBlockAccumulator implements + OutputCollector<Long, Vector>, Closeable { + + private final int height; + private final OutputCollector<LongWritable, SparseRowBlockWritable> delegate; + private long currentBlockNum = -1; + private SparseRowBlockWritable block; + private final LongWritable blockKeyW = new LongWritable(); + + public SparseRowBlockAccumulator(int height, + OutputCollector<LongWritable, SparseRowBlockWritable> delegate) { + this.height = height; + this.delegate = delegate; + } + + private void flushBlock() throws IOException { + if (block == null || block.getNumRows() == 0) { + return; + } + blockKeyW.set(currentBlockNum); + delegate.collect(blockKeyW, block); + block.clear(); + } + + @Override + public void collect(Long rowIndex, Vector v) throws IOException { + + long blockKey = rowIndex / height; + + if (blockKey != currentBlockNum) { + flushBlock(); + if (block == null) { + block = new SparseRowBlockWritable(100); + } + currentBlockNum = blockKey; + } + + block.plusRow((int) (rowIndex % height), v); + } + + @Override + public void close() throws IOException { + flushBlock(); + } + +}
