Author: squinn
Date: Mon Mar 11 19:06:58 2013
New Revision: 1455286
URL: http://svn.apache.org/r1455286
Log:
Applying patch 1159 to add SSVD to SpectralKMeans. Also minor tweaks to Path
structure.
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplaySpectralKMeans.java
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java?rev=1455286&r1=1455285&r2=1455286&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java
Mon Mar 11 19:06:58 2013
@@ -17,6 +17,10 @@
package org.apache.mahout.clustering.spectral.kmeans;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.ToolRunner;
@@ -37,55 +41,71 @@ import org.apache.mahout.math.decomposer
import org.apache.mahout.math.hadoop.DistributedRowMatrix;
import org.apache.mahout.math.hadoop.decomposer.DistributedLanczosSolver;
import org.apache.mahout.math.hadoop.decomposer.EigenVerificationJob;
+import org.apache.mahout.math.hadoop.stochasticsvd.SSVDSolver;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
/**
* Implementation of the EigenCuts spectral clustering algorithm.
+ * This implementation is for testing and debugging.
+ *
+ * Using the variables below the user can:
+ * select to use either SSVDSolver or DistributedLanczosSolver for
the Eigen decomposition.
+ * change the number of iterations in SSVD
+ * choose whether to keep the temp files that are created during a
job
+ * have the output printed to a text file
+ *
+ * All of the steps involved in testing have timers built around them and the
result is printed at
+ * the top of the output text file.
+ *
+ * See the README file for a description of the algorithm, testing results,
and other details.
*/
public class SpectralKMeansDriver extends AbstractJob {
- public static final double OVERSHOOT_MULTIPLIER = 2.0;
+ public static final double OVERSHOOTMULTIPLIER = 2.0;
- public static void main(String[] args) throws Exception {
- ToolRunner.run(new SpectralKMeansDriver(), args);
- }
-
- @Override
- public int run(String[] arg0) throws IOException, ClassNotFoundException,
InterruptedException {
- // set up command line options
- Configuration conf = getConf();
- addInputOption();
- addOutputOption();
- addOption("dimensions", "d", "Square dimensions of affinity matrix", true);
- addOption("clusters", "k", "Number of clusters and top eigenvectors",
true);
- addOption(DefaultOptionCreator.distanceMeasureOption().create());
- addOption(DefaultOptionCreator.convergenceOption().create());
- addOption(DefaultOptionCreator.maxIterationsOption().create());
- addOption(DefaultOptionCreator.overwriteOption().create());
- Map<String, List<String>> parsedArgs = parseArguments(arg0);
- if (parsedArgs == null) {
- return 0;
- }
-
- Path input = getInputPath();
- Path output = getOutputPath();
- if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
- HadoopUtil.delete(conf, output);
- }
- int numDims = Integer.parseInt(getOption("dimensions"));
- int clusters = Integer.parseInt(getOption("clusters"));
- String measureClass =
getOption(DefaultOptionCreator.DISTANCE_MEASURE_OPTION);
- DistanceMeasure measure = ClassUtils.instantiateAs(measureClass,
DistanceMeasure.class);
- double convergenceDelta =
Double.parseDouble(getOption(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION));
- int maxIterations =
Integer.parseInt(getOption(DefaultOptionCreator.MAX_ITERATIONS_OPTION));
-
- run(conf, input, output, numDims, clusters, measure, convergenceDelta,
maxIterations);
-
- return 0;
- }
+ public static void main(String[] args) throws Exception {
+ ToolRunner.run(new SpectralKMeansDriver(), args);
+ }
+
+ @Override
+ public int run(String[] arg0)
+ throws IOException, ClassNotFoundException,
InstantiationException, IllegalAccessException, InterruptedException {
+
+ Configuration conf = getConf();
+ addInputOption();
+ addOutputOption();
+ addOption("dimensions", "d", "Square dimensions of affinity
matrix", true);
+ addOption("clusters", "k", "Number of clusters and top
eigenvectors", true);
+
addOption(DefaultOptionCreator.distanceMeasureOption().create());
+ addOption(DefaultOptionCreator.convergenceOption().create());
+ addOption(DefaultOptionCreator.maxIterationsOption().create());
+ addOption(DefaultOptionCreator.overwriteOption().create());
+ addFlag("usessvd", "ssvd", "Uses SSVD as the eigensolver.
Default is the Lanczos solver.");
+
+ Map<String, List<String>> parsedArgs = parseArguments(arg0);
+ if (parsedArgs == null) {
+ return 0;
+ }
+
+ Path input = getInputPath();
+ Path output = getOutputPath();
+ if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
+ HadoopUtil.delete(conf, output);
+ }
+ int numDims = Integer.parseInt(getOption("dimensions"));
+ int clusters = Integer.parseInt(getOption("clusters"));
+ String measureClass =
getOption(DefaultOptionCreator.DISTANCE_MEASURE_OPTION);
+ DistanceMeasure measure =
ClassUtils.instantiateAs(measureClass, DistanceMeasure.class);
+ double convergenceDelta =
Double.parseDouble(getOption(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION));
+ int maxIterations =
Integer.parseInt(getOption(DefaultOptionCreator.MAX_ITERATIONS_OPTION));
+
+ Path tempdir = new Path(getOption("tempDir"));
+ boolean ssvd = parsedArgs.containsKey("--usessvd");
+
+ run(conf, input, output, numDims, clusters, measure,
convergenceDelta, maxIterations, tempdir, ssvd);
+
+ return 0;
+ }
/**
* Run the Spectral KMeans clustering on the supplied arguments
@@ -98,95 +118,128 @@ public class SpectralKMeansDriver extend
* @param measure the DistanceMeasure for the k-Means calculations
* @param convergenceDelta the double convergence delta for the k-Means
calculations
* @param maxIterations the int maximum number of iterations for the k-Means
calculations
+ * @param tempDir Temporary directory for intermediate calculations
+ * @param ssvd Flag to indicate the eigensolver to use
*/
- public static void run(Configuration conf,
- Path input,
- Path output,
- int numDims,
- int clusters,
- DistanceMeasure measure,
- double convergenceDelta,
- int maxIterations)
- throws IOException, InterruptedException, ClassNotFoundException {
- // create a few new Paths for temp files and transformations
- Path outputCalc = new Path(output, "calculations");
- Path outputTmp = new Path(output, "temporary");
-
- // Take in the raw CSV text file and split it ourselves,
- // creating our own SequenceFiles for the matrices to read later
- // (similar to the style of syntheticcontrol.canopy.InputMapper)
- Path affSeqFiles = new Path(outputCalc, "seqfile-" + (System.nanoTime() &
0xFF));
- AffinityMatrixInputJob.runJob(input, affSeqFiles, numDims, numDims);
-
- // Next step: construct the affinity matrix using the newly-created
- // sequence files
- DistributedRowMatrix A = new DistributedRowMatrix(affSeqFiles,
- new Path(outputTmp,
"afftmp-" + (System.nanoTime() & 0xFF)),
- numDims,
- numDims);
- Configuration depConf = new Configuration(conf);
- A.setConf(depConf);
-
- // Next step: construct the diagonal matrix D (represented as a vector)
- // and calculate the normalized Laplacian of the form:
- // L = D^(-0.5)AD^(-0.5)
- Vector D = MatrixDiagonalizeJob.runJob(affSeqFiles, numDims);
- DistributedRowMatrix L =
- VectorMatrixMultiplicationJob.runJob(affSeqFiles, D,
- new Path(outputCalc, "laplacian-" + (System.nanoTime() & 0xFF)),
new Path(outputCalc, "laplacian-tmp-" + (System.nanoTime() & 0xFF)));
- L.setConf(depConf);
-
- // Next step: perform eigen-decomposition using LanczosSolver
- // since some of the eigen-output is spurious and will be eliminated
- // upon verification, we have to aim to overshoot and then discard
- // unnecessary vectors later
- int overshoot = (int) ((double) clusters * OVERSHOOT_MULTIPLIER);
- DistributedLanczosSolver solver = new DistributedLanczosSolver();
- LanczosState state = new LanczosState(L, overshoot,
DistributedLanczosSolver.getInitialVector(L));
- Path lanczosSeqFiles = new Path(outputCalc, "eigenvectors-" +
(System.nanoTime() & 0xFF));
- solver.runJob(conf,
- state,
- overshoot,
- true,
- lanczosSeqFiles.toString());
-
- // perform a verification
- EigenVerificationJob verifier = new EigenVerificationJob();
- Path verifiedEigensPath = new Path(outputCalc, "eigenverifier");
- verifier.runJob(conf, lanczosSeqFiles, L.getRowPath(), verifiedEigensPath,
true, 1.0, clusters);
- Path cleanedEigens = verifier.getCleanedEigensPath();
- DistributedRowMatrix W = new DistributedRowMatrix(cleanedEigens, new
Path(cleanedEigens, "tmp"), clusters, numDims);
- W.setConf(depConf);
- DistributedRowMatrix Wtrans = W.transpose();
- // DistributedRowMatrix Wt = W.transpose();
-
- // next step: normalize the rows of Wt to unit length
- Path unitVectors = new Path(outputCalc, "unitvectors-" +
(System.nanoTime() & 0xFF));
- UnitVectorizerJob.runJob(Wtrans.getRowPath(), unitVectors);
- DistributedRowMatrix Wt = new DistributedRowMatrix(unitVectors, new
Path(unitVectors, "tmp"), clusters, numDims);
- Wt.setConf(depConf);
-
- // Finally, perform k-means clustering on the rows of L (or W)
- // generate random initial clusters
- Path initialclusters = RandomSeedGenerator.buildRandom(conf,
- Wt.getRowPath(),
- new Path(output,
Cluster.INITIAL_CLUSTERS_DIR),
- clusters,
- measure);
+ public static void run(
+ Configuration conf,
+ Path input,
+ Path output,
+ int numDims,
+ int clusters,
+ DistanceMeasure measure,
+ double convergenceDelta,
+ int maxIterations,
+ Path tempDir,
+ boolean ssvd)
+ throws IOException, InterruptedException,
ClassNotFoundException {
- // The output format is the same as the K-means output format.
- // TODO: Perhaps a conversion of the output format from points and clusters
- // in eigenspace to the original dataset. Currently, the user has to
perform
- // the association step after this job finishes on their own.
- KMeansDriver.run(conf,
- Wt.getRowPath(),
- initialclusters,
- output,
- measure,
- convergenceDelta,
- maxIterations,
- true,
- 0.0,
- false);
- }
+ Path outputCalc = new Path(tempDir, "calculations");
+ Path outputTmp = new Path(tempDir, "temporary");
+
+ // Take in the raw CSV text file and split it ourselves,
+ // creating our own SequenceFiles for the matrices to read
later
+ // (similar to the style of syntheticcontrol.canopy.InputMapper)
+ Path affSeqFiles = new Path(outputCalc, "seqfile");
+ AffinityMatrixInputJob.runJob(input, affSeqFiles, numDims,
numDims);
+
+ // Construct the affinity matrix using the newly-created
sequence files
+ DistributedRowMatrix A =
+ new DistributedRowMatrix(affSeqFiles, new
Path(outputTmp, "afftmp"), numDims, numDims);
+
+ Configuration depConf = new Configuration(conf);
+ A.setConf(depConf);
+
+ // Construct the diagonal matrix D (represented as a vector)
+ Vector D = MatrixDiagonalizeJob.runJob(affSeqFiles, numDims);
+
+ //Calculate the normalized Laplacian of the form: L =
D^(-0.5)AD^(-0.5)
+ DistributedRowMatrix L =
VectorMatrixMultiplicationJob.runJob(affSeqFiles, D,
+ new Path(outputCalc, "laplacian"), new
Path(outputCalc, outputCalc));
+ L.setConf(depConf);
+
+ Path data;
+
+ if (ssvd) {
+ // SSVD requires an array of Paths to function. So we
pass in an array of length one
+ Path [] LPath = new Path[1];
+ LPath[0] = L.getRowPath();
+
+ Path SSVDout = new Path(outputCalc, "SSVD");
+
+ SSVDSolver solveIt = new SSVDSolver(
+ depConf,
+ LPath,
+ SSVDout,
+ 1000, // Vertical height of a q-block
+ clusters,
+ 15, // Oversampling
+ 10);
+
+ solveIt.setComputeV(false);
+ solveIt.setComputeU(true);
+ solveIt.setOverwrite(true);
+ solveIt.setQ(0);
+
+ // May want to update SSVD documentation on this one:
method doc
+ // says "false" is the default, yet it's set to true in
the
+ // variable definition.
+ //solveIt.setBroadcast(false);
+ solveIt.run();
+ data = new Path(solveIt.getUPath());
+ } else {
+ // Perform eigen-decomposition using LanczosSolver
+ // since some of the eigen-output is spurious and will
be eliminated
+ // upon verification, we have to aim to overshoot and
then discard
+ // unnecessary vectors later
+ int overshoot = Math.min((int) ((double) clusters *
OVERSHOOTMULTIPLIER), numDims);
+ DistributedLanczosSolver solver = new
DistributedLanczosSolver();
+ LanczosState state = new LanczosState(L, overshoot,
solver.getInitialVector(L));
+ Path lanczosSeqFiles = new Path(outputCalc,
"eigenvectors");
+
+ solver.runJob(conf,
+ state,
+ overshoot,
+ true,
+ lanczosSeqFiles.toString());
+
+ // perform a verification
+ EigenVerificationJob verifier = new
EigenVerificationJob();
+ Path verifiedEigensPath = new Path(outputCalc,
"eigenverifier");
+ verifier.runJob(conf,
+ lanczosSeqFiles,
+ L.getRowPath(),
+ verifiedEigensPath,
+ true,
+ 1.0,
+ clusters);
+
+ Path cleanedEigens = verifier.getCleanedEigensPath();
+ DistributedRowMatrix W = new DistributedRowMatrix(
+ cleanedEigens, new Path(cleanedEigens,
"tmp"), clusters, numDims);
+ W.setConf(depConf);
+ DistributedRowMatrix Wtrans = W.transpose();
+ data = Wtrans.getRowPath();
+ }
+
+ // Normalize the rows of Wt to unit length
+ // normalize is important because it reduces the occurrence of
two unique clusters combining into one
+ Path unitVectors = new Path(outputCalc, "unitvectors");
+
+ UnitVectorizerJob.runJob(data, unitVectors);
+
+ DistributedRowMatrix Wt = new DistributedRowMatrix(
+ unitVectors, new Path(unitVectors, "tmp"),
clusters, numDims);
+ Wt.setConf(depConf);
+ data = Wt.getRowPath();
+
+ // Generate random initial clusters
+ Path initialclusters = RandomSeedGenerator.buildRandom(conf,
data,
+ new Path(output, Cluster.INITIAL_CLUSTERS_DIR),
clusters, measure);
+
+ // Run the KMeansDriver
+ Path answer = new Path(output, "kmeans_out");
+ KMeansDriver.run(conf, data, initialclusters, answer,
+ measure,convergenceDelta, maxIterations, true,
0.0, false);
+ }
}
Modified:
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplaySpectralKMeans.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplaySpectralKMeans.java?rev=1455286&r1=1455285&r2=1455286&view=diff
==============================================================================
---
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplaySpectralKMeans.java
(original)
+++
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplaySpectralKMeans.java
Mon Mar 11 19:06:58 2013
@@ -38,6 +38,7 @@ public class DisplaySpectralKMeans exten
protected static final String SAMPLES = "samples";
protected static final String OUTPUT = "output";
+ protected static final String TEMP = "tmp";
protected static final String AFFINITIES = "affinities";
DisplaySpectralKMeans() {
@@ -49,6 +50,7 @@ public class DisplaySpectralKMeans exten
DistanceMeasure measure = new ManhattanDistanceMeasure();
Path samples = new Path(SAMPLES);
Path output = new Path(OUTPUT);
+ Path tempDir = new Path(TEMP);
Configuration conf = new Configuration();
HadoopUtil.delete(conf, samples);
HadoopUtil.delete(conf, output);
@@ -73,7 +75,7 @@ public class DisplaySpectralKMeans exten
}
int maxIter = 10;
double convergenceDelta = 0.001;
- SpectralKMeansDriver.run(new Configuration(), affinities, output,
SAMPLE_DATA.size(), 3, measure, convergenceDelta, maxIter);
+ SpectralKMeansDriver.run(new Configuration(), affinities, output,
SAMPLE_DATA.size(), 3, measure, convergenceDelta, maxIter, tempDir, false);
new DisplaySpectralKMeans();
}