Author: squinn
Date: Mon Mar 11 19:06:58 2013
New Revision: 1455286

URL: http://svn.apache.org/r1455286
Log:
Applying patch 1159 to add SSVD to SpectralKMeans. Also minor tweaks to Path 
structure.

Modified:
    
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java
    
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplaySpectralKMeans.java

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java?rev=1455286&r1=1455285&r2=1455286&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java
 Mon Mar 11 19:06:58 2013
@@ -17,6 +17,10 @@
 
 package org.apache.mahout.clustering.spectral.kmeans;
 
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.util.ToolRunner;
@@ -37,55 +41,71 @@ import org.apache.mahout.math.decomposer
 import org.apache.mahout.math.hadoop.DistributedRowMatrix;
 import org.apache.mahout.math.hadoop.decomposer.DistributedLanczosSolver;
 import org.apache.mahout.math.hadoop.decomposer.EigenVerificationJob;
+import org.apache.mahout.math.hadoop.stochasticsvd.SSVDSolver;
 
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
 
 /**
  * Implementation of the EigenCuts spectral clustering algorithm.
+ * This implementation is for testing and debugging. 
+ * 
+ * Using the variables below the user can:
+ *             select to use either SSVDSolver or DistributedLanczosSolver for 
the Eigen decomposition. 
+ *             change the number of iterations in SSVD
+ *             choose whether to keep the temp files that are created during a 
job
+ *             have the output printed to a text file 
+ * 
+ * All of the steps involved in testing have timers built around them and the 
result is printed at
+ * the top of the output text file. 
+ * 
+ * See the README file for a description of the algorithm, testing results, 
and other details.
  */
 public class SpectralKMeansDriver extends AbstractJob {
 
-  public static final double OVERSHOOT_MULTIPLIER = 2.0;
+       public static final double OVERSHOOTMULTIPLIER = 2.0;
 
-  public static void main(String[] args) throws Exception {
-    ToolRunner.run(new SpectralKMeansDriver(), args);
-  }
-
-  @Override
-  public int run(String[] arg0) throws IOException, ClassNotFoundException, 
InterruptedException {
-    // set up command line options
-    Configuration conf = getConf();
-    addInputOption();
-    addOutputOption();
-    addOption("dimensions", "d", "Square dimensions of affinity matrix", true);
-    addOption("clusters", "k", "Number of clusters and top eigenvectors", 
true);
-    addOption(DefaultOptionCreator.distanceMeasureOption().create());
-    addOption(DefaultOptionCreator.convergenceOption().create());
-    addOption(DefaultOptionCreator.maxIterationsOption().create());
-    addOption(DefaultOptionCreator.overwriteOption().create());
-    Map<String, List<String>> parsedArgs = parseArguments(arg0);
-    if (parsedArgs == null) {
-      return 0;
-    }
-
-    Path input = getInputPath();
-    Path output = getOutputPath();
-    if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
-      HadoopUtil.delete(conf, output);
-    }
-    int numDims = Integer.parseInt(getOption("dimensions"));
-    int clusters = Integer.parseInt(getOption("clusters"));
-    String measureClass = 
getOption(DefaultOptionCreator.DISTANCE_MEASURE_OPTION);
-    DistanceMeasure measure = ClassUtils.instantiateAs(measureClass, 
DistanceMeasure.class);
-    double convergenceDelta = 
Double.parseDouble(getOption(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION));
-    int maxIterations = 
Integer.parseInt(getOption(DefaultOptionCreator.MAX_ITERATIONS_OPTION));
-
-    run(conf, input, output, numDims, clusters, measure, convergenceDelta, 
maxIterations);
-
-    return 0;
-  }
+       public static void main(String[] args) throws Exception {
+               ToolRunner.run(new SpectralKMeansDriver(), args);
+       }
+  
+       @Override
+       public int run(String[] arg0)
+                       throws IOException, ClassNotFoundException, 
InstantiationException, IllegalAccessException, InterruptedException {
+    
+               Configuration conf = getConf();
+               addInputOption();
+               addOutputOption();
+               addOption("dimensions", "d", "Square dimensions of affinity 
matrix", true);
+               addOption("clusters", "k", "Number of clusters and top 
eigenvectors", true);
+               
addOption(DefaultOptionCreator.distanceMeasureOption().create());
+               addOption(DefaultOptionCreator.convergenceOption().create());
+               addOption(DefaultOptionCreator.maxIterationsOption().create());
+               addOption(DefaultOptionCreator.overwriteOption().create());
+               addFlag("usessvd", "ssvd", "Uses SSVD as the eigensolver. 
Default is the Lanczos solver.");
+               
+               Map<String, List<String>> parsedArgs = parseArguments(arg0);
+               if (parsedArgs == null) {
+                 return 0;
+               }
+               
+               Path input = getInputPath();
+               Path output = getOutputPath();
+               if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
+                       HadoopUtil.delete(conf, output);
+               }
+               int numDims = Integer.parseInt(getOption("dimensions"));
+               int clusters = Integer.parseInt(getOption("clusters"));
+               String measureClass = 
getOption(DefaultOptionCreator.DISTANCE_MEASURE_OPTION);
+               DistanceMeasure measure = 
ClassUtils.instantiateAs(measureClass, DistanceMeasure.class);
+               double convergenceDelta = 
Double.parseDouble(getOption(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION));
+               int maxIterations = 
Integer.parseInt(getOption(DefaultOptionCreator.MAX_ITERATIONS_OPTION));
+               
+               Path tempdir = new Path(getOption("tempDir"));
+               boolean ssvd = parsedArgs.containsKey("--usessvd");
+               
+               run(conf, input, output, numDims, clusters, measure, 
convergenceDelta, maxIterations, tempdir, ssvd);
+               
+               return 0;
+       }
 
   /**
    * Run the Spectral KMeans clustering on the supplied arguments
@@ -98,95 +118,128 @@ public class SpectralKMeansDriver extend
    * @param measure the DistanceMeasure for the k-Means calculations
    * @param convergenceDelta the double convergence delta for the k-Means 
calculations
    * @param maxIterations the int maximum number of iterations for the k-Means 
calculations
+   * @param tempDir Temporary directory for intermediate calculations
+   * @param ssvd Flag to indicate the eigensolver to use
    */
-  public static void run(Configuration conf,
-                         Path input,
-                         Path output,
-                         int numDims,
-                         int clusters,
-                         DistanceMeasure measure,
-                         double convergenceDelta,
-                         int maxIterations)
-    throws IOException, InterruptedException, ClassNotFoundException {
-    // create a few new Paths for temp files and transformations
-    Path outputCalc = new Path(output, "calculations");
-    Path outputTmp = new Path(output, "temporary");
-
-    // Take in the raw CSV text file and split it ourselves,
-    // creating our own SequenceFiles for the matrices to read later 
-    // (similar to the style of syntheticcontrol.canopy.InputMapper)
-    Path affSeqFiles = new Path(outputCalc, "seqfile-" + (System.nanoTime() & 
0xFF));
-    AffinityMatrixInputJob.runJob(input, affSeqFiles, numDims, numDims);
-
-    // Next step: construct the affinity matrix using the newly-created
-    // sequence files
-    DistributedRowMatrix A = new DistributedRowMatrix(affSeqFiles,
-                                                      new Path(outputTmp, 
"afftmp-" + (System.nanoTime() & 0xFF)),
-                                                      numDims,
-                                                      numDims);
-    Configuration depConf = new Configuration(conf);
-    A.setConf(depConf);
-
-    // Next step: construct the diagonal matrix D (represented as a vector)
-    // and calculate the normalized Laplacian of the form:
-    // L = D^(-0.5)AD^(-0.5)
-    Vector D = MatrixDiagonalizeJob.runJob(affSeqFiles, numDims);
-    DistributedRowMatrix L =
-        VectorMatrixMultiplicationJob.runJob(affSeqFiles, D,
-            new Path(outputCalc, "laplacian-" + (System.nanoTime() & 0xFF)), 
new Path(outputCalc, "laplacian-tmp-" + (System.nanoTime() & 0xFF)));
-    L.setConf(depConf);
-
-    // Next step: perform eigen-decomposition using LanczosSolver
-    // since some of the eigen-output is spurious and will be eliminated
-    // upon verification, we have to aim to overshoot and then discard
-    // unnecessary vectors later
-    int overshoot = (int) ((double) clusters * OVERSHOOT_MULTIPLIER);
-    DistributedLanczosSolver solver = new DistributedLanczosSolver();
-    LanczosState state = new LanczosState(L, overshoot, 
DistributedLanczosSolver.getInitialVector(L));
-    Path lanczosSeqFiles = new Path(outputCalc, "eigenvectors-" + 
(System.nanoTime() & 0xFF));
-    solver.runJob(conf,
-                  state,
-                  overshoot,
-                  true,
-                  lanczosSeqFiles.toString());
-
-    // perform a verification
-    EigenVerificationJob verifier = new EigenVerificationJob();
-    Path verifiedEigensPath = new Path(outputCalc, "eigenverifier");
-    verifier.runJob(conf, lanczosSeqFiles, L.getRowPath(), verifiedEigensPath, 
true, 1.0, clusters);
-    Path cleanedEigens = verifier.getCleanedEigensPath();
-    DistributedRowMatrix W = new DistributedRowMatrix(cleanedEigens, new 
Path(cleanedEigens, "tmp"), clusters, numDims);
-    W.setConf(depConf);
-    DistributedRowMatrix Wtrans = W.transpose();
-    //    DistributedRowMatrix Wt = W.transpose();
-
-    // next step: normalize the rows of Wt to unit length
-    Path unitVectors = new Path(outputCalc, "unitvectors-" + 
(System.nanoTime() & 0xFF));
-    UnitVectorizerJob.runJob(Wtrans.getRowPath(), unitVectors);
-    DistributedRowMatrix Wt = new DistributedRowMatrix(unitVectors, new 
Path(unitVectors, "tmp"), clusters, numDims);
-    Wt.setConf(depConf);
-
-    // Finally, perform k-means clustering on the rows of L (or W)
-    // generate random initial clusters
-    Path initialclusters = RandomSeedGenerator.buildRandom(conf,
-                                                           Wt.getRowPath(),
-                                                           new Path(output, 
Cluster.INITIAL_CLUSTERS_DIR),
-                                                           clusters,
-                                                           measure);
+       public static void run(
+                 Configuration conf,
+                 Path input,
+                 Path output,
+                 int numDims,
+                 int clusters,
+                 DistanceMeasure measure,
+                 double convergenceDelta,
+                 int maxIterations,
+                 Path tempDir,
+                 boolean ssvd)
+                                 throws IOException, InterruptedException, 
ClassNotFoundException {
     
-    // The output format is the same as the K-means output format.
-    // TODO: Perhaps a conversion of the output format from points and clusters
-    // in eigenspace to the original dataset. Currently, the user has to 
perform
-    // the association step after this job finishes on their own.
-    KMeansDriver.run(conf,
-                     Wt.getRowPath(),
-                     initialclusters,
-                     output,
-                     measure,
-                     convergenceDelta,
-                     maxIterations,
-                     true,
-                     0.0, 
-                     false);
-  }
+               Path outputCalc = new Path(tempDir, "calculations");
+               Path outputTmp = new Path(tempDir, "temporary");
+
+               // Take in the raw CSV text file and split it ourselves,
+               // creating our own SequenceFiles for the matrices to read 
later 
+               // (similar to the style of syntheticcontrol.canopy.InputMapper)
+               Path affSeqFiles = new Path(outputCalc, "seqfile");
+               AffinityMatrixInputJob.runJob(input, affSeqFiles, numDims, 
numDims);
+               
+               // Construct the affinity matrix using the newly-created 
sequence files
+               DistributedRowMatrix A = 
+                               new DistributedRowMatrix(affSeqFiles, new 
Path(outputTmp, "afftmp"), numDims, numDims); 
+               
+               Configuration depConf = new Configuration(conf);
+               A.setConf(depConf);
+               
+               // Construct the diagonal matrix D (represented as a vector)
+               Vector D = MatrixDiagonalizeJob.runJob(affSeqFiles, numDims);
+               
+               //Calculate the normalized Laplacian of the form: L = 
D^(-0.5)AD^(-0.5)
+               DistributedRowMatrix L = 
VectorMatrixMultiplicationJob.runJob(affSeqFiles, D,
+                               new Path(outputCalc, "laplacian"), new 
Path(outputCalc, outputCalc));
+               L.setConf(depConf);
+               
+               Path data;
+               
+               if (ssvd) {
+                       // SSVD requires an array of Paths to function. So we 
pass in an array of length one
+                       Path [] LPath = new Path[1];
+                       LPath[0] = L.getRowPath();
+                       
+                       Path SSVDout = new Path(outputCalc, "SSVD");
+                       
+                       SSVDSolver solveIt = new SSVDSolver(
+                                       depConf, 
+                                       LPath, 
+                                       SSVDout, 
+                                       1000, // Vertical height of a q-block
+                                       clusters, 
+                                       15, // Oversampling 
+                                       10);
+                       
+                       solveIt.setComputeV(false); 
+                       solveIt.setComputeU(true);
+                       solveIt.setOverwrite(true);
+                       solveIt.setQ(0);
+                       
+                       // May want to update SSVD documentation on this one: 
method doc
+                       // says "false" is the default, yet it's set to true in 
the 
+                       // variable definition.
+                       //solveIt.setBroadcast(false);
+                       solveIt.run();
+                       data = new Path(solveIt.getUPath());
+               } else {
+                       // Perform eigen-decomposition using LanczosSolver
+                       // since some of the eigen-output is spurious and will 
be eliminated
+                       // upon verification, we have to aim to overshoot and 
then discard
+                       // unnecessary vectors later
+                       int overshoot = Math.min((int) ((double) clusters * 
OVERSHOOTMULTIPLIER), numDims);
+                       DistributedLanczosSolver solver = new 
DistributedLanczosSolver();
+                       LanczosState state = new LanczosState(L, overshoot, 
solver.getInitialVector(L));
+                       Path lanczosSeqFiles = new Path(outputCalc, 
"eigenvectors");
+                       
+                       solver.runJob(conf,
+                                     state,
+                                     overshoot,
+                                     true,
+                                     lanczosSeqFiles.toString());
+                       
+                       // perform a verification
+                       EigenVerificationJob verifier = new 
EigenVerificationJob();
+                       Path verifiedEigensPath = new Path(outputCalc, 
"eigenverifier");
+                       verifier.runJob(conf, 
+                                                       lanczosSeqFiles, 
+                                                       L.getRowPath(), 
+                                                       verifiedEigensPath, 
+                                                       true, 
+                                                       1.0, 
+                                                       clusters);
+                       
+                       Path cleanedEigens = verifier.getCleanedEigensPath();
+                       DistributedRowMatrix W = new DistributedRowMatrix(
+                                       cleanedEigens, new Path(cleanedEigens, 
"tmp"), clusters, numDims);
+                       W.setConf(depConf);
+                       DistributedRowMatrix Wtrans = W.transpose();
+                       data = Wtrans.getRowPath();
+               }
+               
+               // Normalize the rows of Wt to unit length
+               // normalize is important because it reduces the occurrence of 
two unique clusters  combining into one 
+               Path unitVectors = new Path(outputCalc, "unitvectors");
+               
+               UnitVectorizerJob.runJob(data, unitVectors);
+               
+               DistributedRowMatrix Wt = new DistributedRowMatrix(
+                               unitVectors, new Path(unitVectors, "tmp"), 
clusters, numDims);
+               Wt.setConf(depConf);
+               data = Wt.getRowPath();
+               
+               // Generate random initial clusters
+               Path initialclusters = RandomSeedGenerator.buildRandom(conf, 
data,
+                               new Path(output, Cluster.INITIAL_CLUSTERS_DIR), 
clusters, measure);
+                  
+                   // Run the KMeansDriver
+               Path answer = new Path(output, "kmeans_out");
+               KMeansDriver.run(conf, data, initialclusters, answer,
+                               measure,convergenceDelta, maxIterations, true, 
0.0, false);
+    }
 }

Modified: 
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplaySpectralKMeans.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplaySpectralKMeans.java?rev=1455286&r1=1455285&r2=1455286&view=diff
==============================================================================
--- 
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplaySpectralKMeans.java
 (original)
+++ 
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplaySpectralKMeans.java
 Mon Mar 11 19:06:58 2013
@@ -38,6 +38,7 @@ public class DisplaySpectralKMeans exten
 
        protected static final String SAMPLES = "samples";
        protected static final String OUTPUT = "output";
+       protected static final String TEMP = "tmp";
        protected static final String AFFINITIES = "affinities";
        
   DisplaySpectralKMeans() {
@@ -49,6 +50,7 @@ public class DisplaySpectralKMeans exten
     DistanceMeasure measure = new ManhattanDistanceMeasure();
     Path samples = new Path(SAMPLES);
     Path output = new Path(OUTPUT);
+    Path tempDir = new Path(TEMP);
     Configuration conf = new Configuration();
     HadoopUtil.delete(conf, samples);
     HadoopUtil.delete(conf, output);
@@ -73,7 +75,7 @@ public class DisplaySpectralKMeans exten
     }
     int maxIter = 10;
     double convergenceDelta = 0.001;
-    SpectralKMeansDriver.run(new Configuration(), affinities, output, 
SAMPLE_DATA.size(), 3, measure, convergenceDelta, maxIter);
+    SpectralKMeansDriver.run(new Configuration(), affinities, output, 
SAMPLE_DATA.size(), 3, measure, convergenceDelta, maxIter, tempDir, false);
     new DisplaySpectralKMeans();
   }
 


Reply via email to