Author: jeastman
Date: Tue Sep 14 17:35:51 2010
New Revision: 997007
URL: http://svn.apache.org/viewvc?rev=997007&view=rev
Log:
MAHOUT-294: Added arguments to the DistributedLanczosSolver CLI so that
it can optionally launch the EigenVerificationJob. Added a new run() method with
additional arguments needed by EVJ. Added new unit tests of the CLI and adjusted
some of the cluster dumper tests to use the new run method. All tests run.
NOTE: this patch changes the semantics of the DLS --output argument: Formerly
it was the path to the output (raw) eigenvectors file. Now it is a path to an
output
directory in which a rawEigenvectors file will be written. If the EVJ is also
specified,
then the output directory will contain a cleanEigenvectors file in addition.
Added:
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/decomposer/TestDistributedLanczosSolverCLI.java
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/DistributedLanczosSolver.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVerificationJob.java
mahout/trunk/utils/src/test/java/org/apache/mahout/clustering/TestClusterDumper.java
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/DistributedLanczosSolver.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/DistributedLanczosSolver.java?rev=997007&r1=997006&r2=997007&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/DistributedLanczosSolver.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/DistributedLanczosSolver.java
Tue Sep 14 17:35:51 2010
@@ -17,6 +17,11 @@
package org.apache.mahout.math.hadoop.decomposer;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -37,19 +42,15 @@ import org.apache.mahout.math.hadoop.Dis
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-
public class DistributedLanczosSolver extends LanczosSolver implements Tool {
+ public static final String RAW_EIGENVECTORS = "rawEigenvectors";
+
private static final Logger log =
LoggerFactory.getLogger(DistributedLanczosSolver.class);
private Configuration conf;
- private Map<String,String> parsedArgs;
+ private Map<String, String> parsedArgs;
/**
* For the distributed case, the best guess at a useful initialization state
for Lanczos we'll chose to be
@@ -65,33 +66,96 @@ public class DistributedLanczosSolver ex
@Override
public int run(String[] strings) throws Exception {
Path inputPath = new Path(parsedArgs.get("--input"));
- Path outputEigenVectorPath = new Path(parsedArgs.get("--output"));
+ Path outputPath = new Path(parsedArgs.get("--output"));
Path outputTmpPath = new Path(parsedArgs.get("--tempDir"));
int numRows = Integer.parseInt(parsedArgs.get("--numRows"));
int numCols = Integer.parseInt(parsedArgs.get("--numCols"));
boolean isSymmetric = Boolean.parseBoolean(parsedArgs.get("--symmetric"));
int desiredRank = Integer.parseInt(parsedArgs.get("--rank"));
- return run(inputPath, outputTmpPath, outputEigenVectorPath, numRows,
numCols, isSymmetric, desiredRank);
+
+ boolean cleansvd = Boolean.parseBoolean(parsedArgs.get("--cleansvd"));
+ if (cleansvd) {
+ double maxError = Double.parseDouble(parsedArgs.get("--maxError"));
+ double minEigenvalue =
Double.parseDouble(parsedArgs.get("--minEigenvalue"));
+ boolean inMemory = Boolean.parseBoolean(parsedArgs.get("--inMemory"));
+ return run(inputPath,
+ outputPath,
+ outputTmpPath,
+ numRows,
+ numCols,
+ isSymmetric,
+ desiredRank,
+ maxError,
+ minEigenvalue,
+ inMemory);
+ }
+ return run(inputPath, outputPath, outputTmpPath, numRows, numCols,
isSymmetric, desiredRank);
}
+ /**
+ * Run the solver to produce raw eigenvectors, then run the
EigenVerificationJob to clean them
+ *
+ * @param inputPath the Path to the input corpus
+ * @param outputPath the Path to the output
+ * @param outputTmpPath a Path to a temporary working directory
+ * @param numRows the int number of rows
+ * @param numCols the int number of columns
+ * @param isSymmetric true if the input matrix is symmetric
+ * @param desiredRank the int desired rank of eigenvectors to produce
+ * @param maxError the maximum allowable error
+ * @param minEigenvalue the minimum usable eigenvalue
+ * @param inMemory true if the verification can be done in memory
+ * @return an int indicating success (0) or otherwise
+ * @throws Exception
+ */
public int run(Path inputPath,
+ Path outputPath,
Path outputTmpPath,
- Path outputEigenVectorPath, int numRows,
+ int numRows,
int numCols,
boolean isSymmetric,
- int desiredRank) throws Exception {
- Configuration originalConfig = getConf();
+ int desiredRank,
+ double maxError,
+ double minEigenvalue,
+ boolean inMemory) throws Exception {
+ int result = run(inputPath, outputPath, outputTmpPath, numRows, numCols,
isSymmetric, desiredRank);
+ if (result != 0) {
+ return result;
+ }
+ Path rawEigenVectorPath = new Path(outputPath, RAW_EIGENVECTORS);
+ return new EigenVerificationJob().run(inputPath,
+ rawEigenVectorPath,
+ outputPath,
+ outputTmpPath,
+ maxError,
+ minEigenvalue,
+ inMemory,
+ getConf() != null ? new
JobConf(getConf()) : new JobConf());
+ }
+
+ /**
+ * Run the solver to produce the raw eigenvectors
+ *
+ * @param inputPath the Path to the input corpus
+ * @param outputPath the Path to the output
+ * @param outputTmpPath a Path to a temporary working directory
+ * @param numRows the int number of rows
+ * @param numCols the int number of columns
+ * @param isSymmetric true if the input matrix is symmetric
+ * @param desiredRank the int desired rank of eigenvectors to produce
+ * @return an int indicating success (0) or otherwise
+ * @throws Exception
+ */
+ public int run(Path inputPath, Path outputPath, Path outputTmpPath, int
numRows, int numCols, boolean isSymmetric, int desiredRank)
+ throws Exception {
Matrix eigenVectors = new DenseMatrix(desiredRank, numCols);
List<Double> eigenValues = new ArrayList<Double>();
-
- DistributedRowMatrix matrix = new DistributedRowMatrix(inputPath,
- outputTmpPath,
- numRows,
- numCols);
- matrix.configure(new JobConf(originalConfig));
+ DistributedRowMatrix matrix = new DistributedRowMatrix(inputPath,
outputTmpPath, numRows, numCols);
+ matrix.configure(new JobConf(getConf() != null ? getConf() : new
Configuration()));
solve(matrix, desiredRank, eigenVectors, eigenValues, isSymmetric);
+ Path outputEigenVectorPath = new Path(outputPath, RAW_EIGENVECTORS);
serializeOutput(eigenVectors, eigenValues, outputEigenVectorPath);
return 0;
}
@@ -105,7 +169,7 @@ public class DistributedLanczosSolver ex
*/
public void serializeOutput(Matrix eigenVectors, List<Double> eigenValues,
Path outputPath) throws IOException {
log.info("Persisting {} eigenVectors and eigenValues to: {}",
eigenVectors.numRows(), outputPath);
- Configuration conf = getConf();
+ Configuration conf = getConf() != null ? getConf() : new Configuration();
FileSystem fs = FileSystem.get(conf);
SequenceFile.Writer seqWriter = new SequenceFile.Writer(fs, conf,
outputPath, IntWritable.class, VectorWritable.class);
IntWritable iw = new IntWritable();
@@ -154,8 +218,13 @@ public class DistributedLanczosSolver ex
addOption("numRows", "nr", "Number of rows of the input matrix");
addOption("numCols", "nc", "Number of columns of the input matrix");
addOption("rank", "r", "Desired decomposition rank (note: only roughly
1/4 to 1/3 "
- + "of these will have the top portion of the
spectrum)");
+ + "of these will have the top portion of the spectrum)");
addOption("symmetric", "sym", "Is the input matrix square and
symmetric?");
+ // options required to run cleansvd job
+ addOption("cleansvd", "cl", "Run the EigenVerificationJob to clean the
eigenvectors after SVD", false);
+ addOption("maxError", "err", "Maximum acceptable error", "0.05");
+ addOption("minEigenvalue", "mev", "Minimum eigenvalue to keep the vector
for", "0.0");
+ addOption("inMemory", "mem", "Buffer eigen matrix into memory (if you
have enough!)", "false");
DistributedLanczosSolver.this.parsedArgs = parseArguments(args);
if (DistributedLanczosSolver.this.parsedArgs == null) {
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVerificationJob.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVerificationJob.java?rev=997007&r1=997006&r2=997007&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVerificationJob.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVerificationJob.java
Tue Sep 14 17:35:51 2010
@@ -68,7 +68,7 @@ import org.slf4j.LoggerFactory;
*/
public class EigenVerificationJob extends AbstractJob {
- public static final String LARGEST_CLEAN_EIGENS = "largestCleanEigens";
+ public static final String CLEAN_EIGENVECTORS = "cleanEigenvectors";
private static final Logger log =
LoggerFactory.getLogger(EigenVerificationJob.class);
@@ -131,7 +131,7 @@ public class EigenVerificationJob extend
* @param config the JobConf to use, or null if a default is ok (saves
referencing JobConf in calling classes unless needed)
* @throws IOException
*/
- public void run(Path corpusInput,
+ public int run(Path corpusInput,
Path eigenInput,
Path output,
Path tempOut,
@@ -164,6 +164,7 @@ public class EigenVerificationJob extend
List<Map.Entry<MatrixSlice, EigenStatus>> prunedEigenMeta =
pruneEigens(eigenMetaData);
saveCleanEigens(prunedEigenMeta);
+ return 0;
}
private Map<String, String> handleArgs(String[] args) {
@@ -187,7 +188,7 @@ public class EigenVerificationJob extend
}
private void saveCleanEigens(List<Map.Entry<MatrixSlice, EigenStatus>>
prunedEigenMeta) throws IOException {
- Path path = new Path(outPath, LARGEST_CLEAN_EIGENS);
+ Path path = new Path(outPath, CLEAN_EIGENVECTORS);
FileSystem fs = FileSystem.get(conf);
SequenceFile.Writer seqWriter = new SequenceFile.Writer(fs, conf, path,
IntWritable.class, VectorWritable.class);
IntWritable iw = new IntWritable();
Added:
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/decomposer/TestDistributedLanczosSolverCLI.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/decomposer/TestDistributedLanczosSolverCLI.java?rev=997007&view=auto
==============================================================================
---
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/decomposer/TestDistributedLanczosSolverCLI.java
(added)
+++
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/decomposer/TestDistributedLanczosSolverCLI.java
Tue Sep 14 17:35:51 2010
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.hadoop.decomposer;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.mahout.clustering.AbstractCluster;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.math.DenseMatrix;
+import org.apache.mahout.math.Matrix;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.hadoop.DistributedRowMatrix;
+import org.apache.mahout.math.hadoop.TestDistributedRowMatrix;
+import
org.apache.mahout.math.hadoop.decomposer.DistributedLanczosSolver.DistributedLanczosSolverJob;
+import org.junit.Test;
+
+public final class TestDistributedLanczosSolverCLI extends MahoutTestCase {
+
+ @Test
+ public void testDistributedLanczosSolverCLI() throws Exception {
+ Path testData = getTestTempDirPath("testdata");
+ DistributedRowMatrix corpus = new
TestDistributedRowMatrix().randomDistributedMatrix(500, 450, 500, 10, 10.0,
true, testData
+ .toString());
+ corpus.configure(new JobConf());
+ Path output = getTestTempDirPath("output");
+ Path tmp = getTestTempDirPath("tmp");
+ String[] args = { "-i", new Path(testData, "distMatrix").toString(), "-o",
output.toString(), "--tempDir", tmp.toString(), "--numRows", "500",
+ "--numCols", "500", "--rank", "10", "--symmetric", "true" };
+ new DistributedLanczosSolver().new DistributedLanczosSolverJob().run(args);
+
+ Path rawEigenvectors = new Path(output,
DistributedLanczosSolver.RAW_EIGENVECTORS);
+ Matrix eigenVectors = new DenseMatrix(10, corpus.numCols());
+ Configuration conf = new Configuration();
+
+ FileSystem fs = FileSystem.get(rawEigenvectors.toUri(), conf);
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, rawEigenvectors,
conf);
+ try {
+ Writable key =
reader.getKeyClass().asSubclass(Writable.class).newInstance();
+ Writable value =
reader.getValueClass().asSubclass(Writable.class).newInstance();
+ int i = 0;
+ while (reader.next(key, value)) {
+ Vector v = ((VectorWritable) value).get();
+ eigenVectors.assignRow(i, v);
+ System.out.println("k=" + key.toString() + " V=" +
AbstractCluster.formatVector(v, null));
+ value =
reader.getValueClass().asSubclass(Writable.class).newInstance();
+ i++;
+ }
+ assertEquals("number of eigenvectors", 9, i);
+ } finally {
+ reader.close();
+ }
+ }
+
+ @Test
+ public void testDistributedLanczosSolverEVJCLI() throws Exception {
+ Path testData = getTestTempDirPath("testdata");
+ DistributedRowMatrix corpus = new
TestDistributedRowMatrix().randomDistributedMatrix(500, 450, 500, 10, 10.0,
true, testData
+ .toString());
+ corpus.configure(new JobConf());
+ Path output = getTestTempDirPath("output");
+ Path tmp = getTestTempDirPath("tmp");
+ String[] args = { "-i", new Path(testData, "distMatrix").toString(), "-o",
output.toString(), "--tempDir", tmp.toString(), "--numRows", "500",
+ "--numCols", "500", "--rank", "10", "--symmetric", "true",
"--cleansvd", "true" };
+ new DistributedLanczosSolver().new DistributedLanczosSolverJob().run(args);
+
+ Path cleanEigenvectors = new Path(output,
EigenVerificationJob.CLEAN_EIGENVECTORS);
+ Matrix eigenVectors = new DenseMatrix(10, corpus.numCols());
+ Configuration conf = new Configuration();
+
+ FileSystem fs = FileSystem.get(cleanEigenvectors.toUri(), conf);
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs,
cleanEigenvectors, conf);
+ try {
+ Writable key =
reader.getKeyClass().asSubclass(Writable.class).newInstance();
+ Writable value =
reader.getValueClass().asSubclass(Writable.class).newInstance();
+ int i = 0;
+ while (reader.next(key, value)) {
+ Vector v = ((VectorWritable) value).get();
+ eigenVectors.assignRow(i, v);
+ System.out.println("k=" + key.toString() + " V=" +
AbstractCluster.formatVector(v, null));
+ value =
reader.getValueClass().asSubclass(Writable.class).newInstance();
+ i++;
+ }
+ assertEquals("number of clean eigenvectors", 4, i);
+ } finally {
+ reader.close();
+ }
+ }
+
+}
Modified:
mahout/trunk/utils/src/test/java/org/apache/mahout/clustering/TestClusterDumper.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/utils/src/test/java/org/apache/mahout/clustering/TestClusterDumper.java?rev=997007&r1=997006&r2=997007&view=diff
==============================================================================
---
mahout/trunk/utils/src/test/java/org/apache/mahout/clustering/TestClusterDumper.java
(original)
+++
mahout/trunk/utils/src/test/java/org/apache/mahout/clustering/TestClusterDumper.java
Tue Sep 14 17:35:51 2010
@@ -39,6 +39,7 @@ import org.apache.lucene.store.RAMDirect
import org.apache.lucene.util.Version;
import org.apache.mahout.clustering.canopy.CanopyDriver;
import org.apache.mahout.clustering.dirichlet.DirichletDriver;
+import
org.apache.mahout.clustering.dirichlet.models.AbstractVectorModelDistribution;
import
org.apache.mahout.clustering.dirichlet.models.DistanceMeasureClusterDistribution;
import
org.apache.mahout.clustering.dirichlet.models.GaussianClusterDistribution;
import org.apache.mahout.clustering.dirichlet.models.SampledNormalDistribution;
@@ -71,23 +72,14 @@ import org.junit.Test;
public final class TestClusterDumper extends MahoutTestCase {
- private static final String[] DOCS = {
- "The quick red fox jumped over the lazy brown dogs.",
- "The quick brown fox jumped over the lazy red dogs.",
- "The quick red cat jumped over the lazy brown dogs.",
- "The quick brown cat jumped over the lazy red dogs.",
- "Mary had a little lamb whose fleece was white as snow.",
- "Mary had a little goat whose fleece was white as snow.",
- "Mary had a little lamb whose fleece was black as tar.",
- "Dick had a little goat whose fleece was white as snow.",
- "Moby Dick is a story of a whale and a man obsessed.",
- "Moby Bob is a story of a walrus and a man obsessed.",
- "Moby Dick is a story of a whale and a crazy man.",
- "The robber wore a black fleece jacket and a baseball cap.",
- "The robber wore a red fleece jacket and a baseball cap.",
- "The robber wore a white fleece jacket and a baseball cap.",
- "The English Springer Spaniel is the best of all dogs."
- };
+ private static final String[] DOCS = { "The quick red fox jumped over the
lazy brown dogs.",
+ "The quick brown fox jumped over the lazy red dogs.", "The quick red cat
jumped over the lazy brown dogs.",
+ "The quick brown cat jumped over the lazy red dogs.", "Mary had a little
lamb whose fleece was white as snow.",
+ "Mary had a little goat whose fleece was white as snow.", "Mary had a
little lamb whose fleece was black as tar.",
+ "Dick had a little goat whose fleece was white as snow.", "Moby Dick is
a story of a whale and a man obsessed.",
+ "Moby Bob is a story of a walrus and a man obsessed.", "Moby Dick is a
story of a whale and a crazy man.",
+ "The robber wore a black fleece jacket and a baseball cap.", "The robber
wore a red fleece jacket and a baseball cap.",
+ "The robber wore a white fleece jacket and a baseball cap.", "The
English Springer Spaniel is the best of all dogs." };
private List<VectorWritable> sampleData;
private String[] termDictionary;
@@ -164,8 +156,7 @@ public final class TestClusterDumper ext
Path output = getTestTempDirPath("output");
CanopyDriver.runJob(getTestTempDirPath("testdata"), output, measure, 8, 4,
true, false);
// run ClusterDumper
- ClusterDumper clusterDumper =
- new ClusterDumper(new Path(output, "clusters-0"), new Path(output,
"clusteredPoints"));
+ ClusterDumper clusterDumper = new ClusterDumper(new Path(output,
"clusters-0"), new Path(output, "clusteredPoints"));
clusterDumper.printClusters(termDictionary);
}
@@ -176,11 +167,9 @@ public final class TestClusterDumper ext
Path output = getTestTempDirPath("output");
CanopyDriver.runJob(getTestTempDirPath("testdata"), output, measure, 8, 4,
false, false);
// now run the KMeans job
- KMeansDriver.runJob(getTestTempDirPath("testdata"),
- new Path(output, "clusters-0"), output, measure,
0.001, 10, 1, true, false);
+ KMeansDriver.runJob(getTestTempDirPath("testdata"), new Path(output,
"clusters-0"), output, measure, 0.001, 10, 1, true, false);
// run ClusterDumper
- ClusterDumper clusterDumper =
- new ClusterDumper(new Path(output, "clusters-2"), new Path(output,
"clusteredPoints"));
+ ClusterDumper clusterDumper = new ClusterDumper(new Path(output,
"clusters-2"), new Path(output, "clusteredPoints"));
clusterDumper.printClusters(termDictionary);
}
@@ -204,8 +193,7 @@ public final class TestClusterDumper ext
0,
false);
// run ClusterDumper
- ClusterDumper clusterDumper =
- new ClusterDumper(new Path(output, "clusters-3"), new Path(output,
"clusteredPoints"));
+ ClusterDumper clusterDumper = new ClusterDumper(new Path(output,
"clusters-3"), new Path(output, "clusteredPoints"));
clusterDumper.printClusters(termDictionary);
}
@@ -213,11 +201,9 @@ public final class TestClusterDumper ext
public void testMeanShift() throws Exception {
DistanceMeasure measure = new CosineDistanceMeasure();
Path output = getTestTempDirPath("output");
- MeanShiftCanopyDriver.runJob(
- getTestTempDirPath("testdata"), output, measure, 0.5, 0.01, 0.05, 10,
false, true, false);
+ MeanShiftCanopyDriver.runJob(getTestTempDirPath("testdata"), output,
measure, 0.5, 0.01, 0.05, 10, false, true, false);
// run ClusterDumper
- ClusterDumper clusterDumper =
- new ClusterDumper(new Path(output, "clusters-1"), new Path(output,
"clusteredPoints"));
+ ClusterDumper clusterDumper = new ClusterDumper(new Path(output,
"clusters-1"), new Path(output, "clusteredPoints"));
clusterDumper.printClusters(termDictionary);
}
@@ -225,13 +211,10 @@ public final class TestClusterDumper ext
public void testDirichlet() throws Exception {
Path output = getTestTempDirPath("output");
NamedVector prototype = (NamedVector) sampleData.get(0).get();
- ModelDistribution<VectorWritable> modelDistribution =
- new SampledNormalDistribution(new VectorWritable(prototype));
- DirichletDriver.runJob(
- getTestTempDirPath("testdata"), output, modelDistribution, 15, 10,
1.0, 1, true, true, 0, false);
+ AbstractVectorModelDistribution modelDistribution = new
SampledNormalDistribution(new VectorWritable(prototype));
+ DirichletDriver.runJob(getTestTempDirPath("testdata"), output,
modelDistribution, 15, 10, 1.0, 1, true, true, 0, false);
// run ClusterDumper
- ClusterDumper clusterDumper =
- new ClusterDumper(new Path(output, "clusters-10"), new Path(output,
"clusteredPoints"));
+ ClusterDumper clusterDumper = new ClusterDumper(new Path(output,
"clusters-10"), new Path(output, "clusteredPoints"));
clusterDumper.printClusters(termDictionary);
}
@@ -239,13 +222,10 @@ public final class TestClusterDumper ext
public void testDirichlet2() throws Exception {
Path output = getTestTempDirPath("output");
NamedVector prototype = (NamedVector) sampleData.get(0).get();
- ModelDistribution<VectorWritable> modelDistribution =
- new GaussianClusterDistribution(new VectorWritable(prototype));
- DirichletDriver.runJob(
- getTestTempDirPath("testdata"), output, modelDistribution, 15, 10,
1.0, 1, true, true, 0, true);
+ AbstractVectorModelDistribution modelDistribution = new
GaussianClusterDistribution(new VectorWritable(prototype));
+ DirichletDriver.runJob(getTestTempDirPath("testdata"), output,
modelDistribution, 15, 10, 1.0, 1, true, true, 0, true);
// run ClusterDumper
- ClusterDumper clusterDumper =
- new ClusterDumper(new Path(output, "clusters-10"), new Path(output,
"clusteredPoints"));
+ ClusterDumper clusterDumper = new ClusterDumper(new Path(output,
"clusters-10"), new Path(output, "clusteredPoints"));
clusterDumper.printClusters(termDictionary);
}
@@ -253,13 +233,10 @@ public final class TestClusterDumper ext
public void testDirichlet3() throws Exception {
Path output = getTestTempDirPath("output");
NamedVector prototype = (NamedVector) sampleData.get(0).get();
- ModelDistribution<VectorWritable> modelDistribution =
- new DistanceMeasureClusterDistribution(new VectorWritable(prototype));
- DirichletDriver.runJob(
- getTestTempDirPath("testdata"), output, modelDistribution, 15, 10,
1.0, 1, true, true, 0, true);
+ AbstractVectorModelDistribution modelDistribution = new
DistanceMeasureClusterDistribution(new VectorWritable(prototype));
+ DirichletDriver.runJob(getTestTempDirPath("testdata"), output,
modelDistribution, 15, 10, 1.0, 1, true, true, 0, true);
// run ClusterDumper
- ClusterDumper clusterDumper =
- new ClusterDumper(new Path(output, "clusters-10"), new Path(output,
"clusteredPoints"));
+ ClusterDumper clusterDumper = new ClusterDumper(new Path(output,
"clusters-10"), new Path(output, "clusteredPoints"));
clusterDumper.printClusters(termDictionary);
}
@@ -268,17 +245,14 @@ public final class TestClusterDumper ext
DistanceMeasure measure = new EuclideanDistanceMeasure();
Path output = getTestTempDirPath("output");
Path tmp = getTestTempDirPath("tmp");
- Path eigenvectors = new Path(output, "eigenvectors");
+ int desiredRank = 15;
DistributedLanczosSolver solver = new DistributedLanczosSolver();
Configuration conf = new Configuration();
solver.setConf(conf);
Path testData = getTestTempDirPath("testdata");
int sampleDimension = sampleData.get(0).get().size();
- int desiredRank = 15;
- solver.run(testData, tmp, eigenvectors, sampleData.size(),
sampleDimension, false, desiredRank);
-
- new EigenVerificationJob().run(testData, eigenvectors, output, tmp, 0.5,
0.0, true, null);
- Path cleanEigenvectors = new Path(output,
EigenVerificationJob.LARGEST_CLEAN_EIGENS);
+ solver.run(testData, output, tmp, sampleData.size(), sampleDimension,
false, desiredRank, 0.5, 0.0, true);
+ Path cleanEigenvectors = new Path(output,
EigenVerificationJob.CLEAN_EIGENVECTORS);
// build in-memory data matrix A
Matrix a = new DenseMatrix(sampleData.size(), sampleDimension);
@@ -327,8 +301,7 @@ public final class TestClusterDumper ext
// now run the KMeans job
KMeansDriver.runJob(svdData, new Path(output, "clusters-0"), output,
measure, 0.001, 10, 1, true, false);
// run ClusterDumper
- ClusterDumper clusterDumper =
- new ClusterDumper(new Path(output, "clusters-2"), new Path(output,
"clusteredPoints"));
+ ClusterDumper clusterDumper = new ClusterDumper(new Path(output,
"clusters-2"), new Path(output, "clusteredPoints"));
clusterDumper.printClusters(termDictionary);
}
@@ -337,17 +310,50 @@ public final class TestClusterDumper ext
DistanceMeasure measure = new EuclideanDistanceMeasure();
Path output = getTestTempDirPath("output");
Path tmp = getTestTempDirPath("tmp");
- Path eigenvectors = new Path(output, "eigenvectors");
+ int desiredRank = 13;
DistributedLanczosSolver solver = new DistributedLanczosSolver();
Configuration config = new Configuration();
solver.setConf(config);
Path testData = getTestTempDirPath("testdata");
int sampleDimension = sampleData.get(0).get().size();
+ // Run EigenVerificationJob from within DistributedLanczosSolver.run(...)
+ solver.run(testData, output, tmp, sampleData.size(), sampleDimension,
false, desiredRank, 0.5, 0.0, false);
+ Path cleanEigenvectors = new Path(output,
EigenVerificationJob.CLEAN_EIGENVECTORS);
+
+ // now multiply the testdata matrix and the eigenvector matrix
+ DistributedRowMatrix svdT = new DistributedRowMatrix(cleanEigenvectors,
tmp, desiredRank - 1, sampleDimension);
+ JobConf conf = new JobConf(config);
+ svdT.configure(conf);
+ DistributedRowMatrix a = new DistributedRowMatrix(testData, tmp,
sampleData.size(), sampleDimension);
+ a.configure(conf);
+ DistributedRowMatrix sData = a.transpose().times(svdT.transpose());
+ sData.configure(conf);
+
+ // now run the Canopy job to prime kMeans canopies
+ CanopyDriver.runJob(sData.getRowPath(), output, measure, 8, 4, false,
false);
+ // now run the KMeans job
+ KMeansDriver.runJob(sData.getRowPath(), new Path(output, "clusters-0"),
output, measure, 0.001, 10, 1, true, false);
+ // run ClusterDumper
+ ClusterDumper clusterDumper = new ClusterDumper(new Path(output,
"clusters-2"), new Path(output, "clusteredPoints"));
+ clusterDumper.printClusters(termDictionary);
+ }
+
+ @Test
+ public void testKmeansDSVD2() throws Exception {
+ DistanceMeasure measure = new EuclideanDistanceMeasure();
+ Path output = getTestTempDirPath("output");
+ Path tmp = getTestTempDirPath("tmp");
int desiredRank = 13;
- solver.run(testData, tmp, eigenvectors, sampleData.size(),
sampleDimension, false, desiredRank);
-
- new EigenVerificationJob().run(testData, eigenvectors, output, tmp, 0.5,
0.0, false, null);
- Path cleanEigenvectors = new Path(output,
EigenVerificationJob.LARGEST_CLEAN_EIGENS);
+ DistributedLanczosSolver solver = new DistributedLanczosSolver();
+ Configuration config = new Configuration();
+ solver.setConf(config);
+ Path testData = getTestTempDirPath("testdata");
+ int sampleDimension = sampleData.get(0).get().size();
+ // call EigenVerificationJob separately
+ solver.run(testData, output, tmp, sampleData.size(), sampleDimension,
false, desiredRank);
+ Path rawEigenvectors = new Path(output,
DistributedLanczosSolver.RAW_EIGENVECTORS);
+ new EigenVerificationJob().run(testData, rawEigenvectors, output, tmp,
0.5, 0.0, true, null);
+ Path cleanEigenvectors = new Path(output,
EigenVerificationJob.CLEAN_EIGENVECTORS);
// now multiply the testdata matrix and the eigenvector matrix
DistributedRowMatrix svdT = new DistributedRowMatrix(cleanEigenvectors,
tmp, desiredRank - 1, sampleDimension);
@@ -363,8 +369,7 @@ public final class TestClusterDumper ext
// now run the KMeans job
KMeansDriver.runJob(sData.getRowPath(), new Path(output, "clusters-0"),
output, measure, 0.001, 10, 1, true, false);
// run ClusterDumper
- ClusterDumper clusterDumper =
- new ClusterDumper(new Path(output, "clusters-2"), new Path(output,
"clusteredPoints"));
+ ClusterDumper clusterDumper = new ClusterDumper(new Path(output,
"clusters-2"), new Path(output, "clusteredPoints"));
clusterDumper.printClusters(termDictionary);
}
}