Author: dlyubimov
Date: Tue Apr 19 14:32:17 2011
New Revision: 1095102
URL: http://svn.apache.org/viewvc?rev=1095102&view=rev
Log:
MAHOUT-638: second installment of fixes, sequential sparse source test
Added:
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDSolverDenseTest.java
- copied, changed from r1094889,
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDSolverTest.java
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDSolverSparseSequentialTest.java
- copied, changed from r1094889,
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDSolverTest.java
Removed:
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDSolverTest.java
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BBtJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/Omega.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/QJob.java
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BBtJob.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BBtJob.java?rev=1095102&r1=1095101&r2=1095102&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BBtJob.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BBtJob.java
Tue Apr 19 14:32:17 2011
@@ -106,8 +106,7 @@ public final class BBtJob {
// this approach should reduce GC churn rate
double mul = btVec.getQuick(i);
for (int j = i; j < kp; j++) {
- bbtPartial.setQuick(i, j,
- bbtPartial.getQuick(i, j) + mul *
btVec.getQuick(j));
+ bbtPartial.setQuick(i, j, bbtPartial.getQuick(i, j) + mul *
btVec.getQuick(j));
}
}
}
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java?rev=1095102&r1=1095101&r2=1095102&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java
Tue Apr 19 14:32:17 2011
@@ -44,16 +44,18 @@ import org.apache.hadoop.mapreduce.lib.o
import org.apache.mahout.common.iterator.CopyConstructorIterator;
import
org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterator;
import org.apache.mahout.math.DenseVector;
-import org.apache.mahout.math.RandomAccessSparseVector;
-import org.apache.mahout.math.SequentialAccessSparseVector;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;
import org.apache.mahout.math.hadoop.stochasticsvd.QJob.QJobKeyWritable;
/**
- * Bt job. For details, see working notes in MAHOUT-376.
- *
+ * Bt job. For details, see working notes in MAHOUT-376. <P>
+ *
+ * Uses hadoop deprecated API wherever new api has not been updated
(MAHOUT-593),
+ * hence @SuppressWarning("deprecation"). <P>
+ *
*/
+@SuppressWarnings("deprecation")
public final class BtJob {
public static final String OUTPUT_Q = "Q";
@@ -76,7 +78,8 @@ public final class BtJob {
private final VectorWritable btValue = new VectorWritable();
private int kp;
private final VectorWritable qRowValue = new VectorWritable();
- //private int qCount; // debug
+
+ // private int qCount; // debug
void loadNextQt() throws IOException {
Writable key = new QJobKeyWritable();
@@ -85,8 +88,8 @@ public final class BtJob {
boolean more = qInput.next(key, v);
assert more;
- mQt = GivensThinSolver.computeQtHat(v.getBlock(), blockNum == 0 ? 0
- : 1, new CopyConstructorIterator<UpperTriangular>(mRs.iterator()));
+ mQt = GivensThinSolver.computeQtHat(v.getBlock(), blockNum == 0 ? 0 : 1,
+ new CopyConstructorIterator<UpperTriangular>(mRs.iterator()));
r = mQt[0].length;
kp = mQt.length;
if (btValue.get() == null) {
@@ -114,7 +117,7 @@ public final class BtJob {
// // it doesn't matter if it overflows.
// m_outputs.write( OUTPUT_Q, oKey, oV);
// }
- //qCount++;
+ // qCount++;
}
@Override
@@ -128,6 +131,11 @@ public final class BtJob {
super.cleanup(context);
}
+ @SuppressWarnings("unchecked")
+ private void outputQRow(Writable key, Writable value) throws IOException {
+ outputs.getCollector(OUTPUT_Q, null).collect(key, value);
+ }
+
@Override
protected void map(Writable key, VectorWritable value, Context context)
throws IOException, InterruptedException {
if (mQt != null && cnt++ == r) {
@@ -141,17 +149,17 @@ public final class BtJob {
// output Bt outer products
Vector aRow = value.get();
int qRowIndex = r - cnt; // because QHats are initially stored in
- // reverse
+ // reverse
Vector qRow = qRowValue.get();
for (int j = 0; j < kp; j++) {
qRow.setQuick(j, mQt[j][qRowIndex]);
}
-
- outputs.getCollector(OUTPUT_Q, null).collect(key, qRowValue);
+
// make sure Qs are inheriting A row labels.
+ outputQRow(key,qRowValue);
Vector btRow = btValue.get();
- if ((aRow instanceof SequentialAccessSparseVector) || (aRow instanceof
RandomAccessSparseVector)) {
+ if (!aRow.isDense()) {
for (Vector.Element el : aRow) {
double mul = el.get();
for (int j = 0; j < kp; j++) {
@@ -160,7 +168,7 @@ public final class BtJob {
btKey.set(el.index());
context.write(btKey, btValue);
}
- } else {
+ } else {
int n = aRow.size();
for (int i = 0; i < n; i++) {
double mul = aRow.getQuick(i);
@@ -203,8 +211,8 @@ public final class BtJob {
int block = 0;
for (FileStatus fstat : rFiles) {
- SequenceFileValueIterator<VectorWritable> iterator =
- new SequenceFileValueIterator<VectorWritable>(fstat.getPath(),
true, context.getConfiguration());
+ SequenceFileValueIterator<VectorWritable> iterator = new
SequenceFileValueIterator<VectorWritable>(
+ fstat.getPath(), true, context.getConfiguration());
VectorWritable rValue;
try {
rValue = iterator.next();
@@ -212,8 +220,7 @@ public final class BtJob {
iterator.close();
}
if (block < blockNum && block > 0) {
- GivensThinSolver.mergeR(mRs.get(0),
- new UpperTriangular(rValue.get()));
+ GivensThinSolver.mergeR(mRs.get(0), new
UpperTriangular(rValue.get()));
} else {
mRs.add(new UpperTriangular(rValue.get()));
}
@@ -229,8 +236,8 @@ public final class BtJob {
private DenseVector accum;
@Override
- protected void reduce(IntWritable key, Iterable<VectorWritable> values,
- Context ctx) throws IOException, InterruptedException {
+ protected void reduce(IntWritable key, Iterable<VectorWritable> values,
Context ctx) throws IOException,
+ InterruptedException {
Iterator<VectorWritable> vwIter = values.iterator();
Vector vec = vwIter.next().get();
@@ -257,13 +264,12 @@ public final class BtJob {
int k,
int p,
int numReduceTasks,
- Class<? extends Writable> labelClass)
- throws ClassNotFoundException, InterruptedException, IOException {
+ Class<? extends Writable> labelClass) throws
ClassNotFoundException, InterruptedException,
+ IOException {
JobConf oldApiJob = new JobConf(conf);
- MultipleOutputs.addNamedOutput(oldApiJob, OUTPUT_Q,
- org.apache.hadoop.mapred.SequenceFileOutputFormat.class, labelClass,
- VectorWritable.class);
+ MultipleOutputs.addNamedOutput(oldApiJob, OUTPUT_Q,
org.apache.hadoop.mapred.SequenceFileOutputFormat.class,
+ labelClass, VectorWritable.class);
Job job = new Job(oldApiJob);
job.setJobName("Bt-job");
@@ -285,8 +291,7 @@ public final class BtJob {
job.getConfiguration().set("mapreduce.output.basename", OUTPUT_BT);
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);
- SequenceFileOutputFormat.setOutputCompressionType(job,
- CompressionType.BLOCK);
+ SequenceFileOutputFormat.setOutputCompressionType(job,
CompressionType.BLOCK);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(VectorWritable.class);
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/Omega.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/Omega.java?rev=1095102&r1=1095101&r2=1095102&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/Omega.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/Omega.java
Tue Apr 19 14:32:17 2011
@@ -20,8 +20,6 @@ package org.apache.mahout.math.hadoop.st
import java.util.Arrays;
import java.util.Random;
-import org.apache.mahout.math.RandomAccessSparseVector;
-import org.apache.mahout.math.SequentialAccessSparseVector;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.Vector.Element;
@@ -61,13 +59,13 @@ public class Omega {
assert yRow.length == kp;
Arrays.fill(yRow, 0);
- if ((aRow instanceof SequentialAccessSparseVector) || (aRow instanceof
RandomAccessSparseVector)) {
+ if (!aRow.isDense()) {
int j = 0;
for (Element el : aRow) {
accumDots(j, el.get(), yRow);
j++;
}
-
+
} else {
int n = aRow.size();
for (int j = 0; j < n; j++) {
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/QJob.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/QJob.java?rev=1095102&r1=1095101&r2=1095102&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/QJob.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/QJob.java
Tue Apr 19 14:32:17 2011
@@ -51,11 +51,17 @@ import org.apache.mahout.math.VectorWrit
/**
* Compute first level of QHat-transpose blocks.
+ * <P>
*
- * See Mahout-376 woking notes for details.
+ * See Mahout-376 working notes for details.
+ * <P>
*
+ * Uses some of Hadoop deprecated api wherever newer api is not available.
+ * Hence, @SuppressWarnings("deprecation") for imports (MAHOUT-593).
+ * <P>
*
*/
+@SuppressWarnings("deprecation")
public final class QJob {
public static final String PROP_OMEGA_SEED = "ssvd.omegaseed";
@@ -68,8 +74,9 @@ public final class QJob {
private QJob() {
}
+
// public static final String OUTPUT_Q="Q";
- //public static final String OUTPUT_BT = "Bt";
+ // public static final String OUTPUT_BT = "Bt";
public static class QJobKeyWritable implements
WritableComparable<QJobKeyWritable> {
@@ -131,7 +138,7 @@ public final class QJob {
value.setBlock(qt);
getTempQw(context).append(tempKey, value); // this probably should be
- // a sparse row matrix,
+ // a sparse row matrix,
// but compressor should get it for disk and in memory we want it
// dense anyway, sparse random implementations would be
// a mostly a memory management disaster consisting of rehashes and GC
@@ -148,17 +155,24 @@ public final class QJob {
// for efficiency in most cases. Sure mapper should be able to load
// the entire split in memory -- and we don't require even that.
value.setBlock(qSolver.getThinQtTilde());
- outputs.getCollector(OUTPUT_QHAT, null).collect(key, value);
- outputs.getCollector(OUTPUT_R, null).collect(
- key,
- new VectorWritable(new DenseVector(qSolver.getRTilde().getData(),
- true)));
+ outputQHat(key, value);
+ outputR(key, new VectorWritable(new
DenseVector(qSolver.getRTilde().getData(), true)));
} else {
secondPass(ctx);
}
}
+ @SuppressWarnings("unchecked")
+ private void outputQHat(Writable key, Writable value) throws IOException {
+ outputs.getCollector(OUTPUT_QHAT, null).collect(key, value);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void outputR(Writable key, Writable value) throws IOException {
+ outputs.getCollector(OUTPUT_R, null).collect(key, value);
+ }
+
private void secondPass(Context ctx) throws IOException {
qSolver = null; // release mem
FileSystem localFs = FileSystem.getLocal(ctx.getConfiguration());
@@ -166,9 +180,8 @@ public final class QJob {
closeables.addFirst(tempQr);
int qCnt = 0;
while (tempQr.next(tempKey, value)) {
- value.setBlock(GivensThinSolver.computeQtHat(value.getBlock(),
- qCnt,
- new
CopyConstructorIterator<UpperTriangular>(rSubseq.iterator())));
+ value.setBlock(GivensThinSolver.computeQtHat(value.getBlock(), qCnt,
+ new CopyConstructorIterator<UpperTriangular>(rSubseq.iterator())));
if (qCnt == 1) {
// just merge r[0] <- r[1] so it doesn't have to repeat
// in subsequent computeQHat iterators
@@ -176,22 +189,18 @@ public final class QJob {
} else {
qCnt++;
}
- outputs.getCollector(OUTPUT_QHAT, null).collect(key, value);
+ outputQHat(key, value);
}
assert rSubseq.size() == 1;
// m_value.setR(m_rSubseq.get(0));
- outputs.getCollector(OUTPUT_R, null).collect(
- key,
- new VectorWritable(new DenseVector(rSubseq.get(0).getData(),
- true)));
+ outputR(key, new VectorWritable(new
DenseVector(rSubseq.get(0).getData(), true)));
}
@Override
- protected void map(Writable key, VectorWritable value, Context context)
- throws IOException, InterruptedException {
+ protected void map(Writable key, VectorWritable value, Context context)
throws IOException, InterruptedException {
double[] yRow;
if (yLookahead.size() == kp) {
if (qSolver.isFull()) {
@@ -274,8 +283,7 @@ public final class QJob {
String taskTmpDir = System.getProperty("java.io.tmpdir");
FileSystem localFs = FileSystem.getLocal(context.getConfiguration());
tempQPath = new Path(new Path(taskTmpDir), "q-temp.seq");
- tempQw = SequenceFile.createWriter(localFs,
- context.getConfiguration(), tempQPath, IntWritable.class,
+ tempQw = SequenceFile.createWriter(localFs,
context.getConfiguration(), tempQPath, IntWritable.class,
DenseBlockWritable.class, CompressionType.BLOCK);
closeables.addFirst(tempQw);
closeables.addFirst(new IOUtils.DeleteFileOnClose(new
File(tempQPath.toString())));
@@ -295,11 +303,9 @@ public final class QJob {
int numReduceTasks) throws ClassNotFoundException,
InterruptedException, IOException {
JobConf oldApiJob = new JobConf(conf);
- MultipleOutputs.addNamedOutput(oldApiJob, OUTPUT_QHAT,
- org.apache.hadoop.mapred.SequenceFileOutputFormat.class,
+ MultipleOutputs.addNamedOutput(oldApiJob, OUTPUT_QHAT,
org.apache.hadoop.mapred.SequenceFileOutputFormat.class,
QJobKeyWritable.class, DenseBlockWritable.class);
- MultipleOutputs.addNamedOutput(oldApiJob, OUTPUT_R,
- org.apache.hadoop.mapred.SequenceFileOutputFormat.class,
+ MultipleOutputs.addNamedOutput(oldApiJob, OUTPUT_R,
org.apache.hadoop.mapred.SequenceFileOutputFormat.class,
QJobKeyWritable.class, VectorWritable.class);
Job job = new Job(oldApiJob);
@@ -316,8 +322,7 @@ public final class QJob {
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);
- SequenceFileOutputFormat.setOutputCompressionType(job,
- CompressionType.BLOCK);
+ SequenceFileOutputFormat.setOutputCompressionType(job,
CompressionType.BLOCK);
job.setMapOutputKeyClass(QJobKeyWritable.class);
job.setMapOutputValueClass(VectorWritable.class);
@@ -333,9 +338,8 @@ public final class QJob {
job.getConfiguration().setInt(PROP_P, p);
// number of reduce tasks doesn't matter. we don't actually
- // send anything to reducers. in fact, the only reason
- // we need to configure reduce step is so that combiners can fire.
- // so reduce here is purely symbolic.
+ // send anything to reducers.
+
job.setNumReduceTasks(0 /* numReduceTasks */);
job.submit();
Copied:
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDSolverDenseTest.java
(from r1094889,
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDSolverTest.java)
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDSolverDenseTest.java?p2=mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDSolverDenseTest.java&p1=mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDSolverTest.java&r1=1094889&r2=1095102&rev=1095102&view=diff
==============================================================================
---
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDSolverTest.java
(original)
+++
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDSolverDenseTest.java
Tue Apr 19 14:32:17 2011
@@ -38,6 +38,7 @@ import org.apache.mahout.common.RandomUt
import org.apache.mahout.math.DenseMatrix;
import org.apache.mahout.math.DenseVector;
import org.apache.mahout.math.SingularValueDecomposition;
+import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;
import org.junit.Test;
@@ -50,7 +51,7 @@ import org.junit.Test;
* value configured.
*
*/
-public class LocalSSVDSolverTest extends MahoutTestCase {
+public class LocalSSVDSolverDenseTest extends MahoutTestCase {
private static final double s_epsilon = 1.0E-10d;
@@ -80,7 +81,7 @@ public class LocalSSVDSolverTest extends
int n = 100;
double[] row = new double[n];
- DenseVector dv = new DenseVector(row, true);
+ Vector dv = new DenseVector(row, true);
Writable vw = new VectorWritable(dv);
IntWritable roww = new IntWritable();
Copied:
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDSolverSparseSequentialTest.java
(from r1094889,
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDSolverTest.java)
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDSolverSparseSequentialTest.java?p2=mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDSolverSparseSequentialTest.java&p1=mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDSolverTest.java&r1=1094889&r2=1095102&rev=1095102&view=diff
==============================================================================
---
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDSolverTest.java
(original)
+++
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDSolverSparseSequentialTest.java
Tue Apr 19 14:32:17 2011
@@ -1,176 +1,170 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.math.hadoop.stochasticsvd;
-
-import java.io.Closeable;
-import java.io.File;
-import java.util.Deque;
-import java.util.LinkedList;
-import java.util.Random;
-
-import junit.framework.Assert;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.mahout.common.MahoutTestCase;
-import org.apache.mahout.common.RandomUtils;
-import org.apache.mahout.math.DenseMatrix;
-import org.apache.mahout.math.DenseVector;
-import org.apache.mahout.math.SingularValueDecomposition;
-import org.apache.mahout.math.VectorWritable;
-import org.junit.Test;
-
-/**
- *
- * Tests SSVD solver with a made-up data running hadoop
- * solver in a local mode. It requests full-rank SSVD and
- * then compares singular values to that of Colt's SVD
- * asserting epsilon(precision) 1e-10 or whatever most recent
- * value configured.
- *
- */
-public class LocalSSVDSolverTest extends MahoutTestCase {
-
- private static final double s_epsilon = 1.0E-10d;
-
- @Test
- public void testSSVDSolver() throws Exception {
-
- Configuration conf = new Configuration();
- conf.set("mapred.job.tracker", "local");
- conf.set("fs.default.name", "file:///");
-
- // conf.set("mapred.job.tracker","localhost:11011");
- // conf.set("fs.default.name","hdfs://localhost:11010/");
-
- Deque<Closeable> closeables = new LinkedList<Closeable>();
- Random rnd = RandomUtils.getRandom();
-
- File tmpDir = getTestTempDir("svdtmp");
- conf.set("hadoop.tmp.dir", tmpDir.getAbsolutePath());
-
- Path aLocPath = new Path(getTestTempDirPath("svdtmp/A"), "A.seq");
-
- // create distributed row matrix-like struct
- SequenceFile.Writer w = SequenceFile.createWriter(
- FileSystem.getLocal(conf), conf, aLocPath, IntWritable.class,
- VectorWritable.class, CompressionType.BLOCK, new DefaultCodec());
- closeables.addFirst(w);
-
- int n = 100;
- double[] row = new double[n];
- DenseVector dv = new DenseVector(row, true);
- Writable vw = new VectorWritable(dv);
- IntWritable roww = new IntWritable();
-
- double muAmplitude = 50.0;
- int m = 1000;
- for (int i = 0; i < m; i++) {
- for (int j = 0; j < n; j++) {
- row[j] = muAmplitude * (rnd.nextDouble() - 0.5);
- }
- roww.set(i);
- w.append(roww, vw);
- }
- closeables.remove(w);
- w.close();
-
- FileSystem fs = FileSystem.get(conf);
-
- Path tempDirPath = getTestTempDirPath("svd-proc");
- Path aPath = new Path(tempDirPath, "A/A.seq");
- fs.copyFromLocalFile(aLocPath, aPath);
-
- Path svdOutPath = new Path(tempDirPath, "SSVD-out");
-
- // make sure we wipe out previous test results, just a convenience
- fs.delete(svdOutPath, true);
-
- int ablockRows = 251;
- int p = 60;
- int k = 40;
- SSVDSolver ssvd = new SSVDSolver(conf, new Path[] { aPath }, svdOutPath,
- ablockRows, k, p, 3);
- // ssvd.setcUHalfSigma(true);
- // ssvd.setcVHalfSigma(true);
- ssvd.setOverwrite(true);
- ssvd.run();
-
- double[] stochasticSValues = ssvd.getSingularValues();
- System.out.println("--SSVD solver singular values:");
- dumpSv(stochasticSValues);
- System.out.println("--Colt SVD solver singular values:");
-
- // try to run the same thing without stochastic algo
- double[][] a = SSVDSolver.loadDistributedRowMatrix(fs, aPath, conf);
-
- // SingularValueDecompositionImpl svd=new
SingularValueDecompositionImpl(new
- // Array2DRowRealMatrix(a));
- SingularValueDecomposition svd2 = new SingularValueDecomposition(
- new DenseMatrix(a));
-
- a = null;
-
- double[] svalues2 = svd2.getSingularValues();
- dumpSv(svalues2);
-
- for (int i = 0; i < k + p; i++) {
- Assert.assertTrue(Math.abs(svalues2[i] - stochasticSValues[i]) <=
s_epsilon);
- }
-
- double[][] q = SSVDSolver.loadDistributedRowMatrix(fs, new Path(svdOutPath,
- "Bt-job/" + BtJob.OUTPUT_Q + "-*"), conf);
-
- SSVDPrototypeTest.assertOrthonormality(new DenseMatrix(q), false,
s_epsilon);
-
- double[][] u = SSVDSolver.loadDistributedRowMatrix(fs, new Path(svdOutPath,
-
"U/[^_]*"), conf);
-
- SSVDPrototypeTest.assertOrthonormality(new DenseMatrix(u), false,
s_epsilon);
- double[][] v = SSVDSolver.loadDistributedRowMatrix(fs, new Path(svdOutPath,
- "V/[^_]*"), conf);
-
- SSVDPrototypeTest
- .assertOrthonormality(new DenseMatrix(v), false, s_epsilon);
- }
-
- static void dumpSv(double[] s) {
- System.out.printf("svs: ");
- for (double value : s) {
- System.out.printf("%f ", value);
- }
- System.out.println();
-
- }
-
- static void dump(double[][] matrix) {
- for (double[] aMatrix : matrix) {
- for (double anAMatrix : aMatrix) {
- System.out.printf("%f ", anAMatrix);
- }
- System.out.println();
- }
- }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.stochasticsvd;
+
+import java.io.Closeable;
+import java.io.File;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.Random;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.RandomUtils;
+import org.apache.mahout.math.DenseMatrix;
+import org.apache.mahout.math.SequentialAccessSparseVector;
+import org.apache.mahout.math.SingularValueDecomposition;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.junit.Test;
+
+/**
+ *
+ * Tests SSVD solver with a made-up data running hadoop solver in a local mode.
+ * It requests full-rank SSVD and then compares singular values to that of
+ * Colt's SVD asserting epsilon(precision) 1e-10 or whatever most recent value
+ * configured.
+ *
+ */
+public class LocalSSVDSolverSparseSequentialTest extends MahoutTestCase {
+
+ private static final double s_epsilon = 1.0E-10d;
+
+ @Test
+ public void testSSVDSolver() throws Exception {
+
+ Configuration conf = new Configuration();
+ conf.set("mapred.job.tracker", "local");
+ conf.set("fs.default.name", "file:///");
+
+ // conf.set("mapred.job.tracker","localhost:11011");
+ // conf.set("fs.default.name","hdfs://localhost:11010/");
+
+ Deque<Closeable> closeables = new LinkedList<Closeable>();
+ Random rnd = RandomUtils.getRandom();
+
+ File tmpDir = getTestTempDir("svdtmp");
+ conf.set("hadoop.tmp.dir", tmpDir.getAbsolutePath());
+
+ Path aLocPath = new Path(getTestTempDirPath("svdtmp/A"), "A.seq");
+
+ // create distributed row matrix-like struct
+ SequenceFile.Writer w =
SequenceFile.createWriter(FileSystem.getLocal(conf), conf, aLocPath,
IntWritable.class,
+ VectorWritable.class, CompressionType.BLOCK, new DefaultCodec());
+ closeables.addFirst(w);
+
+ int n = 100;
+ Vector dv;
+ VectorWritable vw = new VectorWritable();
+ IntWritable roww = new IntWritable();
+
+ double muAmplitude = 50.0;
+ int m = 1000;
+ for (int i = 0; i < m; i++) {
+ dv=new SequentialAccessSparseVector(n);
+ for (int j = 0; j < n / 5; j++) {
+ dv.setQuick(rnd.nextInt(n), muAmplitude * (rnd.nextDouble() - 0.5));
+ }
+ roww.set(i);
+ vw.set(dv);
+ w.append(roww, vw);
+ }
+ closeables.remove(w);
+ w.close();
+
+ FileSystem fs = FileSystem.get(conf);
+
+ Path tempDirPath = getTestTempDirPath("svd-proc");
+ Path aPath = new Path(tempDirPath, "A/A.seq");
+ fs.copyFromLocalFile(aLocPath, aPath);
+
+ Path svdOutPath = new Path(tempDirPath, "SSVD-out");
+
+ // make sure we wipe out previous test results, just a convenience
+ fs.delete(svdOutPath, true);
+
+ int ablockRows = 251;
+ int p = 60;
+ int k = 40;
+ SSVDSolver ssvd = new SSVDSolver(conf, new Path[] { aPath }, svdOutPath,
ablockRows, k, p, 3);
+ // ssvd.setcUHalfSigma(true);
+ // ssvd.setcVHalfSigma(true);
+ ssvd.setOverwrite(true);
+ ssvd.run();
+
+ double[] stochasticSValues = ssvd.getSingularValues();
+ System.out.println("--SSVD solver singular values:");
+ dumpSv(stochasticSValues);
+ System.out.println("--Colt SVD solver singular values:");
+
+ // try to run the same thing without stochastic algo
+ double[][] a = SSVDSolver.loadDistributedRowMatrix(fs, aPath, conf);
+
+ // SingularValueDecompositionImpl svd=new
SingularValueDecompositionImpl(new
+ // Array2DRowRealMatrix(a));
+ SingularValueDecomposition svd2 = new SingularValueDecomposition(new
DenseMatrix(a));
+
+ a = null;
+
+ double[] svalues2 = svd2.getSingularValues();
+ dumpSv(svalues2);
+
+ for (int i = 0; i < k + p; i++) {
+ Assert.assertTrue(Math.abs(svalues2[i] - stochasticSValues[i]) <=
s_epsilon);
+ }
+
+ double[][] q = SSVDSolver.loadDistributedRowMatrix(fs, new
Path(svdOutPath, "Bt-job/" + BtJob.OUTPUT_Q + "-*"),
+ conf);
+
+ SSVDPrototypeTest.assertOrthonormality(new DenseMatrix(q), false,
s_epsilon);
+
+ double[][] u = SSVDSolver.loadDistributedRowMatrix(fs, new
Path(svdOutPath, "U/[^_]*"), conf);
+
+ SSVDPrototypeTest.assertOrthonormality(new DenseMatrix(u), false,
s_epsilon);
+ double[][] v = SSVDSolver.loadDistributedRowMatrix(fs, new
Path(svdOutPath, "V/[^_]*"), conf);
+
+ SSVDPrototypeTest.assertOrthonormality(new DenseMatrix(v), false,
s_epsilon);
+ }
+
+ static void dumpSv(double[] s) {
+ System.out.printf("svs: ");
+ for (double value : s) {
+ System.out.printf("%f ", value);
+ }
+ System.out.println();
+
+ }
+
+ static void dump(double[][] matrix) {
+ for (double[] aMatrix : matrix) {
+ for (double anAMatrix : aMatrix) {
+ System.out.printf("%f ", anAMatrix);
+ }
+ System.out.println();
+ }
+ }
+
+}