Space: Apache Mahout (https://cwiki.apache.org/confluence/display/MAHOUT)
Page: Dimensional Reduction 
(https://cwiki.apache.org/confluence/display/MAHOUT/Dimensional+Reduction)

Change Comment:
---------------------------------------------------------------------
Added example using Amazon EMR

Edited by Timothy Potter:
---------------------------------------------------------------------
Matrix algebra underpins the way many Big Data algorithms and data structures 
are composed: full-text search can be viewed as doing matrix multiplication of 
the term-document matrix by the query vector (giving a vector over documents 
where the components are the relevance score), computing co-occurrences in a 
collaborative filtering context (people who viewed X also viewed Y, or 
ratings-based CF like the Netflix Prize contest) is taking the squaring the 
user-item interaction matrix, calculating users who are k-degrees separated 
from each other in a social network or web-graph can be found by looking at the 
k-fold product of the graph adjacency matrix, and the list goes on (and these 
are all cases where the linear structure of the matrix is preserved!)

Each of these examples deal with cases of matrices which tend to be 
tremendously large (often millions to tens of millions to hundreds of millions 
of rows or more, by sometimes a comparable number of columns), but also rather 
sparse. Sparse matrices are nice in some respects: dense matrices which are 
10^7 on a side would have 100 trillion non-zero entries! But the sparsity is 
often problematic, because any given two rows (or columns) of the matrix may 
have zero overlap. Additionally, any machine-learning work done on the data 
which comprises the rows has to deal with what is known as "the curse of 
dimensionality", and for example, there are too many columns to train most 
regression or classification problems on them independently.

One of the more useful approaches to dealing with such huge sparse data sets is 
the concept of dimensionality reduction, where a lower dimensional space of the 
original column (feature) space of your data is found / constructed, and your 
rows are mapped into that subspace (or sub-manifold).  In this reduced 
dimensional space, "important" components to distance between points are 
exaggerated, and unimportant ones washed away, and additionally, sparsity of 
your rows is traded for drastically reduced dimensional, but dense 
"signatures". While this loss of sparsity can lead to its own complications, a 
proper dimensionality reduction can help reveal the most important features of 
your data, expose correlations among your supposedly independent original 
variables, and smooth over the zeroes in your correlation matrix.

One of the most straightforward techniques for dimensionality reduction is the 
matrix decomposition: singular value decomposition, eigen decomposition, 
non-negative matrix factorization, etc. In their truncated form these 
decompositions are an excellent first approach toward linearity preserving 
unsupervised feature selection and dimensional reduction. Of course, sparse 
matrices which don't fit in RAM need special treatment as far as decomposition 
is concerned. Parallelizable and/or stream-oriented algorithms are needed.

h1. Singular Value Decomposition

Currently implemented in Mahout (as of 0.3, the first release with MAHOUT-180 
applied), are two scalable implementations of SVD, a stream-oriented 
implementation using the Asymmetric Generalized Hebbian Algorithm outlined in 
Genevieve Gorrell & Brandyn Webb's paper ([Gorrell and Webb 2005| 
http://www.dcs.shef.ac.uk/~genevieve/gorrell_webb.pdf]); and there is a 
[Lanczos | http://en.wikipedia.org/wiki/Lanczos_algorithm] implementation, both 
single-threaded, and in the o.a.m.math.decomposer.lanczos package (math 
module), as a hadoop map-reduce (series of) job(s) in 
o.a.m.math.hadoop.decomposer package (core module). Coming soon: stochastic 
decomposition.

h2. Lanczos

The Lanczos algorithm is designed for eigen-decomposition, but like any such 
algorithm, getting singular vectors out of it is immediate (singular vectors of 
matrix A are just the eigenvectors of A^t * A or A * A^t).  Lanczos works by 
taking a starting seed vector *v* (with cardinality equal to the number of 
columns of the matrix A), and repeatedly multiplying A by the result: *v'* = 
A.times(*v*) (and then subtracting off what is proportional to previous *v'*'s, 
and building up an auxiliary matrix of projections).  In the case where A is 
not square (in general: not symmetric), then you actually want to repeatedly 
multiply A*A^t by *v*: *v'* = (A * A^t).times(*v*), or equivalently, in Mahout, 
A.timesSquared(*v*) (timesSquared is merely an optimization: by changing the 
order of summation in A*A^t.times(*v*), you can do the same computation as one 
pass over the rows of A instead of two).

After *k* iterations of *v_i* = A.timesSquared(*v_(i-1)*), a *k*-by-*k* 
tridiagonal matrix has been created (the auxiliary matrix mentioned above), out 
of which a good (often extremely good) approximation to *k* of the singular 
values (and with the basis spanned by the *v_i*, the *k* singular *vectors* may 
also be extracted) of A may be efficiently extracted.  Which *k*?  It's 
actually a spread across the entire spectrum: the first few will most certainly 
be the largest singular values, and the bottom few will be the smallest, but 
you have no guarantee that just because you have the n'th largest singular 
value of A, that you also have the (n-1)'st as well.  A good rule of thumb is 
to try and extract out the top 3k singular vectors via Lanczos, and then 
discard the bottom two thirds, if you want primarily the largest singular 
values (which is the case for using Lanczos for dimensional reduction).

h3. Parallelization Stragegy

Lanczos is "embarassingly parallelizable": matrix multiplication of a matrix by 
a vector may be carried out row-at-a-time without communication until at the 
end, the results of the intermediate matrix-by-vector outputs are accumulated 
on one final vector.  When it's truly A.times(*v*), the final accumulation 
doesn't even have collision / synchronization issues (the outputs are 
individual separate entries on a single vector), and multicore approaches can 
be very fast, and there should also be tricks to speed things up on Hadoop.  In 
the asymmetric case, where the operation is A.timesSquared(*v*), the 
accumulation does require synchronization (the vectors to be summed have 
nonzero elements all across their range), but delaying writing to disk until 
Mapper close(), and remembering that having a Combiner be the same as the 
Reducer, the bottleneck in accumulation is nowhere near a single point.

h3. Mahout usage

The Mahout DistributedLanzcosSolver is invoked by the <MAHOUT_HOME>/bin/mahout 
svd command. This command takes the following arguments (which can be 
reproduced by just entering the command with no arguments):

{noformat}
Job-Specific Options:                                                           
  --input (-i) input                      Path to job input directory.          
  --output (-o) output                    The directory pathname for output.    
  --numRows (-nr) numRows                 Number of rows of the input matrix    
  --numCols (-nc) numCols                 Number of columns of the input matrix 
  --rank (-r) rank                        Desired decomposition rank (note:     
                                          only roughly 1/4 to 1/3 of these will 
                                          have the top portion of the spectrum) 
  --symmetric (-sym) symmetric            Is the input matrix square and        
                                          symmetric?                            
  --cleansvd (-cl) cleansvd               Run the EigenVerificationJob to clean 
                                          the eigenvectors after SVD            
  --maxError (-err) maxError              Maximum acceptable error              
  --minEigenvalue (-mev) minEigenvalue    Minimum eigenvalue to keep the vector 
                                          for                                   
  --inMemory (-mem) inMemory              Buffer eigen matrix into memory (if   
                                          you have enough!)                     
  --help (-h)                             Print out help                        
  --tempDir tempDir                       Intermediate output directory         
  --startPhase startPhase                 First phase to run                    
  --endPhase endPhase                     Last phase to run                     
{noformat}

The short form invocation may be used to perform the SVD on the input data: 
{code}
  <MAHOUT_HOME>/bin/mahout svd \
  --input (-i) <Path to input matrix> \   
  --output (-o) <The directory pathname for output> \   
  --numRows (-nr) <Number of rows of the input matrix> \   
  --numCols (-nc) <Number of columns of the input matrix> \
  --rank (-r) <Desired decomposition rank> \
  --symmetric (-sym) <Is the input matrix square and symmetric>    
{code} 

The --input argument is the location on HDFS where a 
SequenceFile<Writable,VectorWritable> (preferably SequentialAccessSparseVectors 
instances) lies which you wish to decompose.  Each vector of which has 
--numcols entries.  --numRows is the number of input rows and is used to 
properly size the matrix data structures.

After execution, the --output directory will have a file named 
"rawEigenvectors" containing the raw eigenvectors. As the 
DistributedLanczosSolver sometimes produces "extra" eigenvectors, whose 
eigenvalues aren't valid, and also scales all of the eigenvalues down by the 
max eignenvalue (to avoid floating point overflow), there is an additional step 
which spits out the nice correctly scaled (and non-spurious) eigenvector/value 
pairs. This is done by the "cleansvd" shell script step (c.f. 
EigenVerificationJob).

If you have run he short form svd invocation above and require this "cleaning" 
of the eigen/singular output you can run "cleansvd" as a separate command:
{code}
  <MAHOUT_HOME>/bin/mahout cleansvd \
  --eigenInput <path to raw eigenvectors> \
  --corpusInput <path to corpus> \
  --output <path to output directory> \
  --maxError <maximum allowed error. Default is 0.5> \
  --minEigenvalue <minimum allowed eigenvalue. Default is 0.0> \
  --inMemory <true if the eigenvectors can all fit into memory. Default false>
{code} 

The --corpusInput is the input path from the previous step, --eigenInput is the 
output from the previous step (<output>/rawEigenvectors), and --output is the 
desired output path (same as svd argument). The two "cleaning" params are 
--maxError - the maximum allowed 1-cosAngle(v, A.timesSquared(v)), and 
--minEigenvalue.  Eigenvectors which have too large error, or too small 
eigenvalue are discarded.  Optional argument: --inMemory, if you have enough 
memory on your local machine (not on the hadoop cluster nodes!) to load all 
eigenvectors into memory at once (at least 8 bytes/double * rank * numCols), 
then you will see some speedups on this cleaning process.

After execution, the --output directory will have a file named 
"cleanEigenvectors" containing the clean eigenvectors. 

These two steps can also be invoked together by the svd command by using the 
long form svd invocation:
{code}
  <MAHOUT_HOME>/bin/mahout svd \
  --input (-i) <Path to input matrix> \   
  --output (-o) <The directory pathname for output> \   
  --numRows (-nr) <Number of rows of the input matrix> \   
  --numCols (-nc) <Number of columns of the input matrix> \
  --rank (-r) <Desired decomposition rank> \
  --symmetric (-sym) <Is the input matrix square and symmetric> \  
  --cleansvd "true"   \
  --maxError <maximum allowed error. Default is 0.5> \
  --minEigenvalue <minimum allowed eigenvalue. Default is 0.0> \
  --inMemory <true if the eigenvectors can all fit into memory. Default false>
{code}

After execution, the --output directory will contain two files: the 
"rawEigenvectors" and the "cleanEigenvectors".

TODO: also allow exclusion based on improper orthogonality (currently computed, 
but not checked against constraints).

h3. Example: SVD of ASF Mail Archives on Amazon Elastic MapReduce

This section walks you through a complete example of running the Mahout SVD job 
on Amazon Elastic MapReduce cluster and then preparing the output to be used 
for clustering. This example was developed as part of the effort to benchmark 
Mahout's clustering algorithms using a large document set (see 
[MAHOUT-588|https://issues.apache.org/jira/browse/MAHOUT-588]). Specifically, 
we use the ASF mail archives that have been parsed and converted to the Hadoop 
SequenceFile format (block-compressed) and saved to a public S3 folder: 
s3://asf-mail-archives/mahout-0.4/sequence-files. Overall, there are 6,094,444 
key-value pairs in 283 files taking around 5.7GB of disk.

The bulk of the content for this section was extracted from the Mahout user 
mailing list, see: [Need a little help with using 
SVD|http://search.lucidimagination.com/search/document/748181681ae5238b/need_a_little_help_with_using_svd#134fb2771fd52928]

Note: Some of this work is due in part to credits donated by the Amazon Elastic 
MapReduce team.

h5. 1. Launch EMR Cluster

For a detailed explanation of the steps involved in launching an Amazon Elastic 
MapReduce cluster for running Mahout jobs, please read the "Building Vectors 
for Large Document Sets" section of [Mahout on Elastic 
MapReduce|https://cwiki.apache.org/confluence/display/MAHOUT/Mahout+on+Elastic+MapReduce].

In the remaining steps below, remember to replace JOB_ID with the Job ID of 
your EMR cluster.

h5. 2. Load Mahout 0.5+ JAR into S3

These steps were created with the mahout-0.5-SNAPSHOT because they rely on the 
patch for [MAHOUT-639|https://issues.apache.org/jira/browse/MAHOUT-639]

h5. 3. Copy TFIDF Vectors into HDFS

Before running your SVD job on the vectors, you need to copy them from S3 to 
your EMR cluster's HDFS.

{code}
elastic-mapreduce --jar s3://elasticmapreduce/samples/distcp/distcp.jar \
  --arg 
s3n://ACCESS_KEY:SECRET_KEY@asf-mail-archives/mahout-0.4/sparse-1-gram-stem/tfidf-vectors
 \
  --arg /asf-mail-archives/mahout/sparse-1-gram-stem/tfidf-vectors \
  -j JOB_ID
{code}

h5. 4. Run the SVD Job

Now you're ready to run the SVD job on the vectors stored in HDFS:

{code}
elastic-mapreduce --jar s3://BUCKET/mahout-examples-0.5-SNAPSHOT-job.jar \
  --main-class org.apache.mahout.driver.MahoutDriver \
  --arg svd \
  --arg -i --arg /asf-mail-archives/mahout/sparse-1-gram-stem/tfidf-vectors \
  --arg -o --arg /asf-mail-archives/mahout/svd \
  --arg --rank --arg 100 \
  --arg --numCols --arg 20444 \
  --arg --numRows --arg 6076937 \
  --arg --cleansvd --arg "true" \
  -j JOB_ID
{code}

This will run 100 iterations of the LanczosSolver SVD job to produce 87 
eigenvectors in:

{code}
/asf-mail-archives/mahout/svd/cleanEigenvectors
{code}

Only 87 eigenvectors were produced because of the cleanup step, which removes 
any duplicate eigenvectors caused by convergence issues and numeric overflow 
and any that don't appear to be "eigen" enough (ie, they don't satisfy the 
eigenvector criterion with high enough fidelity). - Jake Mannix

h5. 5. Transform your TFIDF Vectors into Mahout Matrix

The tfidf vectors created by the seq2sparse job are 
SequenceFile<Text,VectorWritable>. The Mahout RowId job transforms these 
vectors into a matrix form that is a SequenceFile<IntWritable,VectorWritable> 
and a SequenceFile<IntWritable,Text> (where the original one is the join of 
these new ones, on the new int key).

{code}
elastic-mapreduce --jar s3://BUCKET/mahout-examples-0.5-SNAPSHOT-job.jar \
  --main-class org.apache.mahout.driver.MahoutDriver \
  --arg rowid \
  --arg 
-Dmapred.input.dir=/asf-mail-archives/mahout/sparse-1-gram-stem/tfidf-vectors \
  --arg 
-Dmapred.output.dir=/asf-mail-archives/mahout/sparse-1-gram-stem/tfidf-matrix \
  -j JOB_ID
{code}

This is not a distributed job and will only run on the master server in your 
EMR cluster. The job produces the following output:

{code}
/asf-mail-archives/mahout/sparse-1-gram-stem/tfidf-matrix/docIndex
/asf-mail-archives/mahout/sparse-1-gram-stem/tfidf-matrix/matrix
{code}

where docIndex is the SequenceFile<IntWritable,Text> and matrix is 
SequenceFile<IntWritable,VectorWritable>.

h5. 6. Transpose the Matrix

Our ultimate goal is to multiply the TFIDF vector matrix times our SVD 
eigenvectors. For the mathematically inclined, from the rowid job, we now have 
an m x n matrix T (m=6076937, n=20444). The SVD eigenvector matrix E is p x n 
(p=87, n=20444). So to multiply these two matrices, I need to transpose E so 
that the number of columns in T equals the number of rows in E (i.e. E^T is n x 
p) the result of the matrixmult would give me an m x p matrix (m=6076937, p=87).

However, in practice, computing the matrix product of two matrices as a 
map-reduce job is efficiently done as a map-side join on two row-based matrices 
with the same number of rows, and the columns are the ones which are different. 
 In particular, if you take a matrix X which is represented as a set of 
numRowsX rows, each of which has numColsX, and another matrix with numRowsY == 
numRowsX, each of which has numColsY (!= numColsX), then by summing the 
outer-products of each of the numRowsX pairs of vectors, you get a matrix of 
with numRowsZ == numColsX, and numColsZ == numColsY (if you instead take the 
reverse outer product of the vector pairs, you can end up with the transpose of 
this final result, with numRowsZ == numColsY, and numColsZ == numColsX). - Jake 
Mannix

Thus, we need to transpose the matrix using Mahout's Transpose Job:

{code}
elastic-mapreduce --jar s3://BUCKET/mahout-examples-0.5-SNAPSHOT-job.jar \
  --main-class org.apache.mahout.driver.MahoutDriver \
  --arg transpose \
  --arg -i --arg 
/asf-mail-archives/mahout/sparse-1-gram-stem/tfidf-matrix/matrix \
  --arg --numRows --arg 6076937 \
  --arg --numCols --arg 20444 \
  -j JOB_ID
{code}

This job requires the patch to 
[MAHOUT-639|https://issues.apache.org/jira/browse/MAHOUT-639]

The job creates the following output:

{code}
/asf-mail-archives/mahout/sparse-1-gram-stem/tfidf-matrix/transpose-##
{code}

The number is computed internally by Mahout, which makes it hard to script the 
matrixmult job, see 
[MAHOUT-655|https://issues.apache.org/jira/browse/MAHOUT-655]

h5. 7. Transpose Eigenvectors

If you followed Jake's explanation in step 6 above, then you know that we also 
need to transpose the eigenvectors:

{code}
elastic-mapreduce --jar s3://BUCKET/mahout-examples-0.5-SNAPSHOT-job.jar \
  --main-class org.apache.mahout.driver.MahoutDriver \
  --arg transpose \
  --arg -i --arg /asf-mail-archives/mahout/svd/cleanEigenvectors \
  --arg --numRows --arg 87 \
  --arg --numCols --arg 20444 \
  -j JOB_ID
{code}

The job creates the following output:

{code}
/asf-mail-archives/mahout/svd/transpose-##
{code}

h5. 8. Matrix Multiplication

Lastly, we need to multiply the transposed vectors using Mahout's matrixmult 
job:

{code}
elastic-mapreduce --jar s3://BUCKET/mahout-examples-0.5-SNAPSHOT-job.jar \
  --main-class org.apache.mahout.driver.MahoutDriver \
  --arg matrixmult \
  --arg --numRowsA --arg 20444 \
  --arg --numColsA --arg 6076937 \
  --arg --numRowsB --arg 20444 \
  --arg --numColsB --arg 87 \
  --arg --inputPathA --arg 
/asf-mail-archives/mahout/sparse-1-gram-stem/tfidf-matrix/transpose-## \
  --arg --inputPathB --arg /asf-mail-archives/mahout/svd/transpose-## \
  -j JOB_ID
{code}

Notice that you need to know the name of the transpose output directory to 
supply the input paths. Hopefully this will be resolved soon.

h1. Resources

* http://www.dcs.shef.ac.uk/~genevieve/lsa_tutorial.htm
* 
http://www.puffinwarellc.com/index.php/news-and-articles/articles/30-singular-value-decomposition-tutorial.html


Change your notification preferences: 
https://cwiki.apache.org/confluence/users/viewnotifications.action    

Reply via email to