http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/lda/cvb/TopicModel.java
----------------------------------------------------------------------
diff --git 
a/mr/src/main/java/org/apache/mahout/clustering/lda/cvb/TopicModel.java 
b/mr/src/main/java/org/apache/mahout/clustering/lda/cvb/TopicModel.java
new file mode 100644
index 0000000..7b7816c
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/clustering/lda/cvb/TopicModel.java
@@ -0,0 +1,513 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.lda.cvb;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.RandomUtils;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable;
+import org.apache.mahout.math.DenseMatrix;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.DistributedRowMatrixWriter;
+import org.apache.mahout.math.Matrix;
+import org.apache.mahout.math.MatrixSlice;
+import org.apache.mahout.math.SequentialAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.Vector.Element;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.function.Functions;
+import org.apache.mahout.math.stats.Sampler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Thin wrapper around a {@link Matrix} of counts of occurrences of (topic, 
term) pairs.  Dividing
+ * {code topicTermCount.viewRow(topic).get(term)} by the sum over the values 
for all terms in that
+ * row yields p(term | topic).  Instead dividing it by all topic columns for 
that term yields
+ * p(topic | term).
+ *
+ * Multithreading is enabled for the {@code update(Matrix)} method: this 
method is async, and
+ * merely submits the matrix to a work queue.  When all work has been 
submitted,
+ * {@code awaitTermination()} should be called, which will block until updates 
have been
+ * accumulated.
+ */
+public class TopicModel implements Configurable, Iterable<MatrixSlice> {
+  
+  private static final Logger log = LoggerFactory.getLogger(TopicModel.class);
+  
+  private final String[] dictionary;
+  private final Matrix topicTermCounts;
+  private final Vector topicSums;
+  private final int numTopics;
+  private final int numTerms;
+  private final double eta;
+  private final double alpha;
+
+  private Configuration conf;
+
+  private final Sampler sampler;
+  private final int numThreads;
+  private ThreadPoolExecutor threadPool;
+  private Updater[] updaters;
+
+  public int getNumTerms() {
+    return numTerms;
+  }
+
+  public int getNumTopics() {
+    return numTopics;
+  }
+
+  public TopicModel(int numTopics, int numTerms, double eta, double alpha, 
String[] dictionary,
+      double modelWeight) {
+    this(numTopics, numTerms, eta, alpha, null, dictionary, 1, modelWeight);
+  }
+
+  public TopicModel(Configuration conf, double eta, double alpha,
+      String[] dictionary, int numThreads, double modelWeight, Path... 
modelpath) throws IOException {
+    this(loadModel(conf, modelpath), eta, alpha, dictionary, numThreads, 
modelWeight);
+  }
+
+  public TopicModel(int numTopics, int numTerms, double eta, double alpha, 
String[] dictionary,
+      int numThreads, double modelWeight) {
+    this(new DenseMatrix(numTopics, numTerms), new DenseVector(numTopics), 
eta, alpha, dictionary,
+        numThreads, modelWeight);
+  }
+
+  public TopicModel(int numTopics, int numTerms, double eta, double alpha, 
Random random,
+      String[] dictionary, int numThreads, double modelWeight) {
+    this(randomMatrix(numTopics, numTerms, random), eta, alpha, dictionary, 
numThreads, modelWeight);
+  }
+
+  private TopicModel(Pair<Matrix, Vector> model, double eta, double alpha, 
String[] dict,
+      int numThreads, double modelWeight) {
+    this(model.getFirst(), model.getSecond(), eta, alpha, dict, numThreads, 
modelWeight);
+  }
+
+  public TopicModel(Matrix topicTermCounts, Vector topicSums, double eta, 
double alpha,
+    String[] dictionary, double modelWeight) {
+    this(topicTermCounts, topicSums, eta, alpha, dictionary, 1, modelWeight);
+  }
+
+  public TopicModel(Matrix topicTermCounts, double eta, double alpha, String[] 
dictionary,
+      int numThreads, double modelWeight) {
+    this(topicTermCounts, viewRowSums(topicTermCounts),
+        eta, alpha, dictionary, numThreads, modelWeight);
+  }
+
+  public TopicModel(Matrix topicTermCounts, Vector topicSums, double eta, 
double alpha,
+    String[] dictionary, int numThreads, double modelWeight) {
+    this.dictionary = dictionary;
+    this.topicTermCounts = topicTermCounts;
+    this.topicSums = topicSums;
+    this.numTopics = topicSums.size();
+    this.numTerms = topicTermCounts.numCols();
+    this.eta = eta;
+    this.alpha = alpha;
+    this.sampler = new Sampler(RandomUtils.getRandom());
+    this.numThreads = numThreads;
+    if (modelWeight != 1) {
+      topicSums.assign(Functions.mult(modelWeight));
+      for (int x = 0; x < numTopics; x++) {
+        topicTermCounts.viewRow(x).assign(Functions.mult(modelWeight));
+      }
+    }
+    initializeThreadPool();
+  }
+
+  private static Vector viewRowSums(Matrix m) {
+    Vector v = new DenseVector(m.numRows());
+    for (MatrixSlice slice : m) {
+      v.set(slice.index(), slice.vector().norm(1));
+    }
+    return v;
+  }
+
+  private synchronized void initializeThreadPool() {
+    if (threadPool != null) {
+      threadPool.shutdown();
+      try {
+        threadPool.awaitTermination(100, TimeUnit.SECONDS);
+      } catch (InterruptedException e) {
+        log.error("Could not terminate all threads for TopicModel in time.", 
e);
+      }
+    }
+    threadPool = new ThreadPoolExecutor(numThreads, numThreads, 0, 
TimeUnit.SECONDS,
+                                                           new 
ArrayBlockingQueue<Runnable>(numThreads * 10));
+    threadPool.allowCoreThreadTimeOut(false);
+    updaters = new Updater[numThreads];
+    for (int i = 0; i < numThreads; i++) {
+      updaters[i] = new Updater();
+      threadPool.submit(updaters[i]);
+    }
+  }
+
+  Matrix topicTermCounts() {
+    return topicTermCounts;
+  }
+
+  @Override
+  public Iterator<MatrixSlice> iterator() {
+    return topicTermCounts.iterateAll();
+  }
+
+  public Vector topicSums() {
+    return topicSums;
+  }
+
+  private static Pair<Matrix,Vector> randomMatrix(int numTopics, int numTerms, 
Random random) {
+    Matrix topicTermCounts = new DenseMatrix(numTopics, numTerms);
+    Vector topicSums = new DenseVector(numTopics);
+    if (random != null) {
+      for (int x = 0; x < numTopics; x++) {
+        for (int term = 0; term < numTerms; term++) {
+          topicTermCounts.viewRow(x).set(term, random.nextDouble());
+        }
+      }
+    }
+    for (int x = 0; x < numTopics; x++) {
+      topicSums.set(x, random == null ? 1.0 : 
topicTermCounts.viewRow(x).norm(1));
+    }
+    return Pair.of(topicTermCounts, topicSums);
+  }
+
+  public static Pair<Matrix, Vector> loadModel(Configuration conf, Path... 
modelPaths)
+    throws IOException {
+    int numTopics = -1;
+    int numTerms = -1;
+    List<Pair<Integer, Vector>> rows = Lists.newArrayList();
+    for (Path modelPath : modelPaths) {
+      for (Pair<IntWritable, VectorWritable> row
+          : new SequenceFileIterable<IntWritable, VectorWritable>(modelPath, 
true, conf)) {
+        rows.add(Pair.of(row.getFirst().get(), row.getSecond().get()));
+        numTopics = Math.max(numTopics, row.getFirst().get());
+        if (numTerms < 0) {
+          numTerms = row.getSecond().get().size();
+        }
+      }
+    }
+    if (rows.isEmpty()) {
+      throw new IOException(Arrays.toString(modelPaths) + " have no vectors in 
it");
+    }
+    numTopics++;
+    Matrix model = new DenseMatrix(numTopics, numTerms);
+    Vector topicSums = new DenseVector(numTopics);
+    for (Pair<Integer, Vector> pair : rows) {
+      model.viewRow(pair.getFirst()).assign(pair.getSecond());
+      topicSums.set(pair.getFirst(), pair.getSecond().norm(1));
+    }
+    return Pair.of(model, topicSums);
+  }
+
+  // NOTE: this is purely for debug purposes.  It is not performant to 
"toString()" a real model
+  @Override
+  public String toString() {
+    StringBuilder buf = new StringBuilder();
+    for (int x = 0; x < numTopics; x++) {
+      String v = dictionary != null
+          ? vectorToSortedString(topicTermCounts.viewRow(x).normalize(1), 
dictionary)
+          : topicTermCounts.viewRow(x).asFormatString();
+      buf.append(v).append('\n');
+    }
+    return buf.toString();
+  }
+
+  public int sampleTerm(Vector topicDistribution) {
+    return 
sampler.sample(topicTermCounts.viewRow(sampler.sample(topicDistribution)));
+  }
+
+  public int sampleTerm(int topic) {
+    return sampler.sample(topicTermCounts.viewRow(topic));
+  }
+
+  public synchronized void reset() {
+    for (int x = 0; x < numTopics; x++) {
+      topicTermCounts.assignRow(x, new SequentialAccessSparseVector(numTerms));
+    }
+    topicSums.assign(1.0);
+    if (threadPool.isTerminated()) {
+      initializeThreadPool();
+    }
+  }
+
+  public synchronized void stop() {
+    for (Updater updater : updaters) {
+      updater.shutdown();
+    }
+    threadPool.shutdown();
+    try {
+      if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
+        log.warn("Threadpool timed out on await termination - jobs still 
running!");
+      }
+    } catch (InterruptedException e) {
+      log.error("Interrupted shutting down!", e);
+    }
+  }
+
+  public void renormalize() {
+    for (int x = 0; x < numTopics; x++) {
+      topicTermCounts.assignRow(x, topicTermCounts.viewRow(x).normalize(1));
+      topicSums.assign(1.0);
+    }
+  }
+
+  public void trainDocTopicModel(Vector original, Vector topics, Matrix 
docTopicModel) {
+    // first calculate p(topic|term,document) for all terms in original, and 
all topics,
+    // using p(term|topic) and p(topic|doc)
+    pTopicGivenTerm(original, topics, docTopicModel);
+    normalizeByTopic(docTopicModel);
+    // now multiply, term-by-term, by the document, to get the weighted 
distribution of
+    // term-topic pairs from this document.
+    for (Element e : original.nonZeroes()) {
+      for (int x = 0; x < numTopics; x++) {
+        Vector docTopicModelRow = docTopicModel.viewRow(x);
+        docTopicModelRow.setQuick(e.index(), 
docTopicModelRow.getQuick(e.index()) * e.get());
+      }
+    }
+    // now recalculate \(p(topic|doc)\) by summing contributions from all of 
pTopicGivenTerm
+    topics.assign(0.0);
+    for (int x = 0; x < numTopics; x++) {
+      topics.set(x, docTopicModel.viewRow(x).norm(1));
+    }
+    // now renormalize so that \(sum_x(p(x|doc))\) = 1
+    topics.assign(Functions.mult(1 / topics.norm(1)));
+  }
+
+  public Vector infer(Vector original, Vector docTopics) {
+    Vector pTerm = original.like();
+    for (Element e : original.nonZeroes()) {
+      int term = e.index();
+      // p(a) = sum_x (p(a|x) * p(x|i))
+      double pA = 0;
+      for (int x = 0; x < numTopics; x++) {
+        pA += (topicTermCounts.viewRow(x).get(term) / topicSums.get(x)) * 
docTopics.get(x);
+      }
+      pTerm.set(term, pA);
+    }
+    return pTerm;
+  }
+
+  public void update(Matrix docTopicCounts) {
+    for (int x = 0; x < numTopics; x++) {
+      updaters[x % updaters.length].update(x, docTopicCounts.viewRow(x));
+    }
+  }
+
+  public void updateTopic(int topic, Vector docTopicCounts) {
+    topicTermCounts.viewRow(topic).assign(docTopicCounts, Functions.PLUS);
+    topicSums.set(topic, topicSums.get(topic) + docTopicCounts.norm(1));
+  }
+
+  public void update(int termId, Vector topicCounts) {
+    for (int x = 0; x < numTopics; x++) {
+      Vector v = topicTermCounts.viewRow(x);
+      v.set(termId, v.get(termId) + topicCounts.get(x));
+    }
+    topicSums.assign(topicCounts, Functions.PLUS);
+  }
+
+  public void persist(Path outputDir, boolean overwrite) throws IOException {
+    FileSystem fs = outputDir.getFileSystem(conf);
+    if (overwrite) {
+      fs.delete(outputDir, true); // CHECK second arg
+    }
+    DistributedRowMatrixWriter.write(outputDir, conf, topicTermCounts);
+  }
+
+  /**
+   * Computes {@code \(p(topic x | term a, document i)\)} distributions given 
input document {@code i}.
+   * {@code \(pTGT[x][a]\)} is the (un-normalized) {@code \(p(x|a,i)\)}, or if 
docTopics is {@code null},
+   * {@code \(p(a|x)\)} (also un-normalized).
+   *
+   * @param document doc-term vector encoding {@code \(w(term a|document i)\)}.
+   * @param docTopics {@code docTopics[x]} is the overall weight of topic 
{@code x} in given
+   *          document. If {@code null}, a topic weight of {@code 1.0} is used 
for all topics.
+   * @param termTopicDist storage for output {@code \(p(x|a,i)\)} 
distributions.
+   */
+  private void pTopicGivenTerm(Vector document, Vector docTopics, Matrix 
termTopicDist) {
+    // for each topic x
+    for (int x = 0; x < numTopics; x++) {
+      // get p(topic x | document i), or 1.0 if docTopics is null
+      double topicWeight = docTopics == null ? 1.0 : docTopics.get(x);
+      // get w(term a | topic x)
+      Vector topicTermRow = topicTermCounts.viewRow(x);
+      // get \sum_a w(term a | topic x)
+      double topicSum = topicSums.get(x);
+      // get p(topic x | term a) distribution to update
+      Vector termTopicRow = termTopicDist.viewRow(x);
+
+      // for each term a in document i with non-zero weight
+      for (Element e : document.nonZeroes()) {
+        int termIndex = e.index();
+
+        // calc un-normalized p(topic x | term a, document i)
+        double termTopicLikelihood = (topicTermRow.get(termIndex) + eta) * 
(topicWeight + alpha)
+            / (topicSum + eta * numTerms);
+        termTopicRow.set(termIndex, termTopicLikelihood);
+      }
+    }
+  }
+
+  /**
+   * \(sum_x sum_a (c_ai * log(p(x|i) * p(a|x)))\)
+   */
+  public double perplexity(Vector document, Vector docTopics) {
+    double perplexity = 0;
+    double norm = docTopics.norm(1) + (docTopics.size() * alpha);
+    for (Element e : document.nonZeroes()) {
+      int term = e.index();
+      double prob = 0;
+      for (int x = 0; x < numTopics; x++) {
+        double d = (docTopics.get(x) + alpha) / norm;
+        double p = d * (topicTermCounts.viewRow(x).get(term) + eta)
+                   / (topicSums.get(x) + eta * numTerms);
+        prob += p;
+      }
+      perplexity += e.get() * Math.log(prob);
+    }
+    return -perplexity;
+  }
+
+  private void normalizeByTopic(Matrix perTopicSparseDistributions) {
+    // then make sure that each of these is properly normalized by topic: 
sum_x(p(x|t,d)) = 1
+    for (Element e : perTopicSparseDistributions.viewRow(0).nonZeroes()) {
+      int a = e.index();
+      double sum = 0;
+      for (int x = 0; x < numTopics; x++) {
+        sum += perTopicSparseDistributions.viewRow(x).get(a);
+      }
+      for (int x = 0; x < numTopics; x++) {
+        perTopicSparseDistributions.viewRow(x).set(a,
+            perTopicSparseDistributions.viewRow(x).get(a) / sum);
+      }
+    }
+  }
+
+  public static String vectorToSortedString(Vector vector, String[] 
dictionary) {
+    List<Pair<String,Double>> vectorValues = 
Lists.newArrayListWithCapacity(vector.getNumNondefaultElements());
+    for (Element e : vector.nonZeroes()) {
+      vectorValues.add(Pair.of(dictionary != null ? dictionary[e.index()] : 
String.valueOf(e.index()),
+                               e.get()));
+    }
+    Collections.sort(vectorValues, new Comparator<Pair<String, Double>>() {
+      @Override public int compare(Pair<String, Double> x, Pair<String, 
Double> y) {
+        return y.getSecond().compareTo(x.getSecond());
+      }
+    });
+    Iterator<Pair<String,Double>> listIt = vectorValues.iterator();
+    StringBuilder bldr = new StringBuilder(2048);
+    bldr.append('{');
+    int i = 0;
+    while (listIt.hasNext() && i < 25) {
+      i++;
+      Pair<String,Double> p = listIt.next();
+      bldr.append(p.getFirst());
+      bldr.append(':');
+      bldr.append(p.getSecond());
+      bldr.append(',');
+    }
+    if (bldr.length() > 1) {
+      bldr.setCharAt(bldr.length() - 1, '}');
+    }
+    return bldr.toString();
+  }
+
+  @Override
+  public void setConf(Configuration configuration) {
+    this.conf = configuration;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  private final class Updater implements Runnable {
+    private final ArrayBlockingQueue<Pair<Integer, Vector>> queue =
+        new ArrayBlockingQueue<>(100);
+    private boolean shutdown = false;
+    private boolean shutdownComplete = false;
+
+    public void shutdown() {
+      try {
+        synchronized (this) {
+          while (!shutdownComplete) {
+            shutdown = true;
+            wait(10000L); // Arbitrarily, wait 10 seconds rather than forever 
for this
+          }
+        }
+      } catch (InterruptedException e) {
+        log.warn("Interrupted waiting to shutdown() : ", e);
+      }
+    }
+
+    public boolean update(int topic, Vector v) {
+      if (shutdown) { // maybe don't do this?
+        throw new IllegalStateException("In SHUTDOWN state: cannot submit 
tasks");
+      }
+      while (true) { // keep trying if interrupted
+        try {
+          // start async operation by submitting to the queue
+          queue.put(Pair.of(topic, v));
+          // return once you got access to the queue
+          return true;
+        } catch (InterruptedException e) {
+          log.warn("Interrupted trying to queue update:", e);
+        }
+      }
+    }
+
+    @Override
+    public void run() {
+      while (!shutdown) {
+        try {
+          Pair<Integer, Vector> pair = queue.poll(1, TimeUnit.SECONDS);
+          if (pair != null) {
+            updateTopic(pair.getFirst(), pair.getSecond());
+          }
+        } catch (InterruptedException e) {
+          log.warn("Interrupted waiting to poll for update", e);
+        }
+      }
+      // in shutdown mode, finish remaining tasks!
+      for (Pair<Integer, Vector> pair : queue) {
+        updateTopic(pair.getFirst(), pair.getSecond());
+      }
+      synchronized (this) {
+        shutdownComplete = true;
+        notifyAll();
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/package-info.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/clustering/package-info.java 
b/mr/src/main/java/org/apache/mahout/clustering/package-info.java
new file mode 100644
index 0000000..9926b91
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/clustering/package-info.java
@@ -0,0 +1,13 @@
+/**
+ * <p></p>This package provides several clustering algorithm implementations. 
Clustering usually groups a set of
+ * objects into groups of similar items. The definition of similarity usually 
is up to you - for text documents,
+ * cosine-distance/-similarity is recommended. Mahout also features other 
types of distance measure like
+ * Euclidean distance.</p>
+ *
+ * <p></p>Input of each clustering algorithm is a set of vectors representing 
your items. For texts in general these are
+ * <a href="http://en.wikipedia.org/wiki/TFIDF";>TFIDF</a> or
+ * <a href="http://en.wikipedia.org/wiki/Bag_of_words";>Bag of words</a> 
representations of the documents.</p>
+ *
+ * <p>Output of each clustering algorithm is either a hard or soft assignment 
of items to clusters.</p>
+ */
+package org.apache.mahout.clustering;

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/spectral/AffinityMatrixInputJob.java
----------------------------------------------------------------------
diff --git 
a/mr/src/main/java/org/apache/mahout/clustering/spectral/AffinityMatrixInputJob.java
 
b/mr/src/main/java/org/apache/mahout/clustering/spectral/AffinityMatrixInputJob.java
new file mode 100644
index 0000000..aa12b9e
--- /dev/null
+++ 
b/mr/src/main/java/org/apache/mahout/clustering/spectral/AffinityMatrixInputJob.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.hadoop.DistributedRowMatrix;
+
+public final class AffinityMatrixInputJob {
+
+  private AffinityMatrixInputJob() {
+  }
+
+  /**
+   * Initializes and executes the job of reading the documents containing
+   * the data of the affinity matrix in (x_i, x_j, value) format.
+   */
+  public static void runJob(Path input, Path output, int rows, int cols)
+    throws IOException, InterruptedException, ClassNotFoundException {
+    Configuration conf = new Configuration();
+    HadoopUtil.delete(conf, output);
+
+    conf.setInt(Keys.AFFINITY_DIMENSIONS, rows);
+    Job job = new Job(conf, "AffinityMatrixInputJob: " + input + " -> M/R -> " 
+ output);
+
+    job.setMapOutputKeyClass(IntWritable.class);
+    job.setMapOutputValueClass(DistributedRowMatrix.MatrixEntryWritable.class);
+    job.setOutputKeyClass(IntWritable.class);
+    job.setOutputValueClass(VectorWritable.class);
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
+    job.setMapperClass(AffinityMatrixInputMapper.class);   
+    job.setReducerClass(AffinityMatrixInputReducer.class);
+
+    FileInputFormat.addInputPath(job, input);
+    FileOutputFormat.setOutputPath(job, output);
+
+    job.setJarByClass(AffinityMatrixInputJob.class);
+
+    boolean succeeded = job.waitForCompletion(true);
+    if (!succeeded) {
+      throw new IllegalStateException("Job failed!");
+    }
+  }
+
+  /**
+   * A transparent wrapper for the above method which handles the tedious tasks
+   * of setting and retrieving system Paths. Hands back a fully-populated
+   * and initialized DistributedRowMatrix.
+   */
+  public static DistributedRowMatrix runJob(Path input, Path output, int 
dimensions)
+    throws IOException, InterruptedException, ClassNotFoundException {
+    Path seqFiles = new Path(output, "seqfiles-" + (System.nanoTime() & 0xFF));
+    runJob(input, seqFiles, dimensions, dimensions);
+    DistributedRowMatrix a = new DistributedRowMatrix(seqFiles,
+        new Path(seqFiles, "seqtmp-" + (System.nanoTime() & 0xFF)), 
+        dimensions, dimensions);
+    a.setConf(new Configuration());
+    return a;
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/spectral/AffinityMatrixInputMapper.java
----------------------------------------------------------------------
diff --git 
a/mr/src/main/java/org/apache/mahout/clustering/spectral/AffinityMatrixInputMapper.java
 
b/mr/src/main/java/org/apache/mahout/clustering/spectral/AffinityMatrixInputMapper.java
new file mode 100644
index 0000000..30d2404
--- /dev/null
+++ 
b/mr/src/main/java/org/apache/mahout/clustering/spectral/AffinityMatrixInputMapper.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral;
+
+import java.io.IOException;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.math.hadoop.DistributedRowMatrix;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>Handles reading the files representing the affinity matrix. Since the 
affinity
+ * matrix is representative of a graph, each line in all the files should
+ * take the form:</p>
+ *
+ * {@code i,j,value}
+ *
+ * <p>where {@code i} and {@code j} are the {@code i}th and
+ * {@code j} data points in the entire set, and {@code value}
+ * represents some measurement of their relative absolute magnitudes. This
+ * is, simply, a method for representing a graph textually.
+ */
+public class AffinityMatrixInputMapper
+    extends Mapper<LongWritable, Text, IntWritable, 
DistributedRowMatrix.MatrixEntryWritable> {
+
+  private static final Logger log = 
LoggerFactory.getLogger(AffinityMatrixInputMapper.class);
+
+  private static final Pattern COMMA_PATTERN = Pattern.compile(",");
+
+  @Override
+  protected void map(LongWritable key, Text value, Context context) throws 
IOException, InterruptedException {
+
+    String[] elements = COMMA_PATTERN.split(value.toString());
+    log.debug("(DEBUG - MAP) Key[{}], Value[{}]", key.get(), value);
+
+    // enforce well-formed textual representation of the graph
+    if (elements.length != 3) {
+      throw new IOException("Expected input of length 3, received "
+                            + elements.length + ". Please make sure you adhere 
to "
+                            + "the structure of (i,j,value) for representing a 
graph in text. "
+                            + "Input line was: '" + value + "'.");
+    }
+    if (elements[0].isEmpty() || elements[1].isEmpty() || 
elements[2].isEmpty()) {
+      throw new IOException("Found an element of 0 length. Please be sure you 
adhere to the structure of "
+          + "(i,j,value) for  representing a graph in text.");
+    }
+
+    // parse the line of text into a DistributedRowMatrix entry,
+    // making the row (elements[0]) the key to the Reducer, and
+    // setting the column (elements[1]) in the entry itself
+    DistributedRowMatrix.MatrixEntryWritable toAdd = new 
DistributedRowMatrix.MatrixEntryWritable();
+    IntWritable row = new IntWritable(Integer.valueOf(elements[0]));
+    toAdd.setRow(-1); // already set as the Reducer's key
+    toAdd.setCol(Integer.valueOf(elements[1]));
+    toAdd.setVal(Double.valueOf(elements[2]));
+    context.write(row, toAdd);
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/spectral/AffinityMatrixInputReducer.java
----------------------------------------------------------------------
diff --git 
a/mr/src/main/java/org/apache/mahout/clustering/spectral/AffinityMatrixInputReducer.java
 
b/mr/src/main/java/org/apache/mahout/clustering/spectral/AffinityMatrixInputReducer.java
new file mode 100644
index 0000000..d892969
--- /dev/null
+++ 
b/mr/src/main/java/org/apache/mahout/clustering/spectral/AffinityMatrixInputReducer.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.SequentialAccessSparseVector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.hadoop.DistributedRowMatrix;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tasked with taking each DistributedRowMatrix entry and collecting them
+ * into vectors corresponding to rows. The input and output keys are the same,
+ * corresponding to the row in the ensuing matrix. The matrix entries are
+ * entered into a vector according to the column to which they belong, and
+ * the vector is then given the key corresponding to its row.
+ */
+public class AffinityMatrixInputReducer
+    extends Reducer<IntWritable, DistributedRowMatrix.MatrixEntryWritable, 
IntWritable, VectorWritable> {
+
+  private static final Logger log = 
LoggerFactory.getLogger(AffinityMatrixInputReducer.class);
+
+  @Override
+  protected void reduce(IntWritable row, 
Iterable<DistributedRowMatrix.MatrixEntryWritable> values, Context context)
+    throws IOException, InterruptedException {
+    int size = context.getConfiguration().getInt(Keys.AFFINITY_DIMENSIONS, 
Integer.MAX_VALUE);
+    RandomAccessSparseVector out = new RandomAccessSparseVector(size, 100);
+
+    for (DistributedRowMatrix.MatrixEntryWritable element : values) {
+      out.setQuick(element.getCol(), element.getVal());
+      if (log.isDebugEnabled()) {
+        log.debug("(DEBUG - REDUCE) Row[{}], Column[{}], Value[{}]",
+                  row.get(), element.getCol(), element.getVal());
+      }
+    }
+    SequentialAccessSparseVector output = new 
SequentialAccessSparseVector(out);
+    context.write(row, new VectorWritable(output));
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/spectral/IntDoublePairWritable.java
----------------------------------------------------------------------
diff --git 
a/mr/src/main/java/org/apache/mahout/clustering/spectral/IntDoublePairWritable.java
 
b/mr/src/main/java/org/apache/mahout/clustering/spectral/IntDoublePairWritable.java
new file mode 100644
index 0000000..593cc58
--- /dev/null
+++ 
b/mr/src/main/java/org/apache/mahout/clustering/spectral/IntDoublePairWritable.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * This class is a Writable implementation of the mahout.common.Pair
+ * generic class. Since the generic types would also themselves have to
+ * implement Writable, it made more sense to create a more specialized
+ * version of the class altogether.
+ * 
+ * In essence, this can be treated as a single Vector Element.
+ */
+public class IntDoublePairWritable implements Writable {
+  
+  private int key;
+  private double value;
+  
+  public IntDoublePairWritable() {
+  }
+  
+  public IntDoublePairWritable(int k, double v) {
+    this.key = k;
+    this.value = v;
+  }
+  
+  public void setKey(int k) {
+    this.key = k;
+  }
+  
+  public void setValue(double v) {
+    this.value = v;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    this.key = in.readInt();
+    this.value = in.readDouble();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(key);
+    out.writeDouble(value);
+  }
+
+  public int getKey() {
+    return key;
+  }
+
+  public double getValue() {
+    return value;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/spectral/Keys.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/clustering/spectral/Keys.java 
b/mr/src/main/java/org/apache/mahout/clustering/spectral/Keys.java
new file mode 100644
index 0000000..268a365
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/clustering/spectral/Keys.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral;
+
+public class Keys {
+
+  /**
+   * Sets the SequenceFile index for the diagonal matrix.
+   */
+  public static final int DIAGONAL_CACHE_INDEX = 1;
+
+  public static final String AFFINITY_DIMENSIONS = 
"org.apache.mahout.clustering.spectral.common.affinitydimensions";
+
+  private Keys() {}
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/spectral/MatrixDiagonalizeJob.java
----------------------------------------------------------------------
diff --git 
a/mr/src/main/java/org/apache/mahout/clustering/spectral/MatrixDiagonalizeJob.java
 
b/mr/src/main/java/org/apache/mahout/clustering/spectral/MatrixDiagonalizeJob.java
new file mode 100644
index 0000000..f245f99
--- /dev/null
+++ 
b/mr/src/main/java/org/apache/mahout/clustering/spectral/MatrixDiagonalizeJob.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+/**
+ * Given a matrix, this job returns a vector whose i_th element is the 
+ * sum of all the elements in the i_th row of the original matrix.
+ */
+public final class MatrixDiagonalizeJob {
+
+  private MatrixDiagonalizeJob() {
+  }
+
+  public static Vector runJob(Path affInput, int dimensions)
+    throws IOException, ClassNotFoundException, InterruptedException {
+    
+    // set up all the job tasks
+    Configuration conf = new Configuration();
+    Path diagOutput = new Path(affInput.getParent(), "diagonal");
+    HadoopUtil.delete(conf, diagOutput);
+    conf.setInt(Keys.AFFINITY_DIMENSIONS, dimensions);
+    Job job = new Job(conf, "MatrixDiagonalizeJob");
+    
+    job.setInputFormatClass(SequenceFileInputFormat.class);
+    job.setMapOutputKeyClass(NullWritable.class);
+    job.setMapOutputValueClass(IntDoublePairWritable.class);
+    job.setOutputKeyClass(NullWritable.class);
+    job.setOutputValueClass(VectorWritable.class);
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
+    job.setMapperClass(MatrixDiagonalizeMapper.class);
+    job.setReducerClass(MatrixDiagonalizeReducer.class);
+    
+    FileInputFormat.addInputPath(job, affInput);
+    FileOutputFormat.setOutputPath(job, diagOutput);
+    
+    job.setJarByClass(MatrixDiagonalizeJob.class);
+
+    boolean succeeded = job.waitForCompletion(true);
+    if (!succeeded) {
+      throw new IllegalStateException("Job failed!");
+    }
+
+    // read the results back from the path
+    return VectorCache.load(conf, new Path(diagOutput, "part-r-00000"));
+  }
+  
+  public static class MatrixDiagonalizeMapper
+    extends Mapper<IntWritable, VectorWritable, NullWritable, 
IntDoublePairWritable> {
+    
+    @Override
+    protected void map(IntWritable key, VectorWritable row, Context context) 
+      throws IOException, InterruptedException {
+      // store the sum
+      IntDoublePairWritable store = new IntDoublePairWritable(key.get(), 
row.get().zSum());
+      context.write(NullWritable.get(), store);
+    }
+  }
+  
+  public static class MatrixDiagonalizeReducer
+    extends Reducer<NullWritable, IntDoublePairWritable, NullWritable, 
VectorWritable> {
+    
+    @Override
+    protected void reduce(NullWritable key, Iterable<IntDoublePairWritable> 
values,
+      Context context) throws IOException, InterruptedException {
+      // create the return vector
+      Vector retval = new 
DenseVector(context.getConfiguration().getInt(Keys.AFFINITY_DIMENSIONS, 
Integer.MAX_VALUE));
+      // put everything in its correct spot
+      for (IntDoublePairWritable e : values) {
+        retval.setQuick(e.getKey(), e.getValue());
+      }
+      // write it out
+      context.write(key, new VectorWritable(retval));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/spectral/UnitVectorizerJob.java
----------------------------------------------------------------------
diff --git 
a/mr/src/main/java/org/apache/mahout/clustering/spectral/UnitVectorizerJob.java 
b/mr/src/main/java/org/apache/mahout/clustering/spectral/UnitVectorizerJob.java
new file mode 100644
index 0000000..56cb237
--- /dev/null
+++ 
b/mr/src/main/java/org/apache/mahout/clustering/spectral/UnitVectorizerJob.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.mahout.math.VectorWritable;
+
+/**
+ * <p>Given a DistributedRowMatrix, this job normalizes each row to unit
+ * vector length. If the input is a matrix U, and the output is a matrix
+ * W, the job follows:</p>
+ *
+ * <p>{@code v_ij = u_ij / sqrt(sum_j(u_ij * u_ij))}</p>
+ */
+public final class UnitVectorizerJob {
+
+  private UnitVectorizerJob() {
+  }
+
+  public static void runJob(Path input, Path output)
+    throws IOException, InterruptedException, ClassNotFoundException {
+    
+    Configuration conf = new Configuration();
+    Job job = new Job(conf, "UnitVectorizerJob");
+    
+    job.setInputFormatClass(SequenceFileInputFormat.class);
+    job.setOutputKeyClass(IntWritable.class);
+    job.setOutputValueClass(VectorWritable.class);
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
+    job.setMapperClass(UnitVectorizerMapper.class);
+    job.setNumReduceTasks(0);
+    
+    FileInputFormat.addInputPath(job, input);
+    FileOutputFormat.setOutputPath(job, output);
+
+    job.setJarByClass(UnitVectorizerJob.class);
+
+    boolean succeeded = job.waitForCompletion(true);
+    if (!succeeded) {
+      throw new IllegalStateException("Job failed!");
+    }
+  }
+  
+  public static class UnitVectorizerMapper
+    extends Mapper<IntWritable, VectorWritable, IntWritable, VectorWritable> {
+    
+    @Override
+    protected void map(IntWritable row, VectorWritable vector, Context 
context) 
+      throws IOException, InterruptedException {
+      context.write(row, new VectorWritable(vector.get().normalize(2)));
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/spectral/VectorCache.java
----------------------------------------------------------------------
diff --git 
a/mr/src/main/java/org/apache/mahout/clustering/spectral/VectorCache.java 
b/mr/src/main/java/org/apache/mahout/clustering/spectral/VectorCache.java
new file mode 100644
index 0000000..60e0a2e
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/clustering/spectral/VectorCache.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+
+import com.google.common.io.Closeables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.common.HadoopUtil;
+import 
org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterator;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This class handles reading and writing vectors to the Hadoop
+ * distributed cache. Created as a result of Eigencuts' liberal use
+ * of such functionality, but available to any algorithm requiring it.
+ */
+public final class VectorCache {
+
+  private static final Logger log = LoggerFactory.getLogger(VectorCache.class);
+
+  private VectorCache() {
+  }
+
+  /**
+   * @param key    SequenceFile key
+   * @param vector Vector to save, to be wrapped as VectorWritable
+   */
+  public static void save(Writable key,
+                          Vector vector,
+                          Path output,
+                          Configuration conf,
+                          boolean overwritePath,
+                          boolean deleteOnExit) throws IOException {
+
+    FileSystem fs = FileSystem.get(output.toUri(), conf);
+    output = fs.makeQualified(output);
+    if (overwritePath) {
+      HadoopUtil.delete(conf, output);
+    }
+
+    // set the cache
+    DistributedCache.setCacheFiles(new URI[]{output.toUri()}, conf);
+
+    // set up the writer
+    SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, output,
+            IntWritable.class, VectorWritable.class);
+    try {
+      writer.append(key, new VectorWritable(vector));
+    } finally {
+      Closeables.close(writer, false);
+    }
+
+    if (deleteOnExit) {
+      fs.deleteOnExit(output);
+    }
+  }
+
+  /**
+   * Calls the save() method, setting the cache to overwrite any previous
+   * Path and to delete the path after exiting
+   */
+  public static void save(Writable key, Vector vector, Path output, 
Configuration conf) throws IOException {
+    save(key, vector, output, conf, true, true);
+  }
+
+  /**
+   * Loads the vector from {@link DistributedCache}. Returns null if no vector 
exists.
+   */
+  public static Vector load(Configuration conf) throws IOException {
+    Path[] files = HadoopUtil.getCachedFiles(conf);
+
+    if (files.length != 1) {
+      throw new IOException("Cannot read Frequency list from Distributed Cache 
(" + files.length + ')');
+    }
+
+    if (log.isInfoEnabled()) {
+      log.info("Files are: {}", Arrays.toString(files));
+    }
+    return load(conf, files[0]);
+  }
+
+  /**
+   * Loads a Vector from the specified path. Returns null if no vector exists.
+   */
+  public static Vector load(Configuration conf, Path input) throws IOException 
{
+    log.info("Loading vector from: {}", input);
+    SequenceFileValueIterator<VectorWritable> iterator =
+            new SequenceFileValueIterator<>(input, true, conf);
+    try {
+      return iterator.next().get();
+    } finally {
+      Closeables.close(iterator, true);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/spectral/VectorMatrixMultiplicationJob.java
----------------------------------------------------------------------
diff --git 
a/mr/src/main/java/org/apache/mahout/clustering/spectral/VectorMatrixMultiplicationJob.java
 
b/mr/src/main/java/org/apache/mahout/clustering/spectral/VectorMatrixMultiplicationJob.java
new file mode 100644
index 0000000..c42ab70
--- /dev/null
+++ 
b/mr/src/main/java/org/apache/mahout/clustering/spectral/VectorMatrixMultiplicationJob.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.function.Functions;
+import org.apache.mahout.math.hadoop.DistributedRowMatrix;
+
+/**
+ * <p>This class handles the three-way multiplication of the digonal matrix
+ * and the Markov transition matrix inherent in the Eigencuts algorithm.
+ * The equation takes the form:</p>
+ *
+ * {@code W = D^(1/2) * M * D^(1/2)}
+ *
+ * <p>Since the diagonal matrix D has only n non-zero elements, it is 
represented
+ * as a dense vector in this job, rather than a full n-by-n matrix. This job
+ * performs the multiplications and returns the new DRM.
+ */
+public final class VectorMatrixMultiplicationJob {
+
+  private VectorMatrixMultiplicationJob() {
+  }
+
+  /**
+   * Invokes the job.
+   * @param markovPath Path to the markov DRM's sequence files
+   */
+  public static DistributedRowMatrix runJob(Path markovPath, Vector diag, Path 
outputPath)
+    throws IOException, ClassNotFoundException, InterruptedException {
+    
+    return runJob(markovPath, diag, outputPath, new Path(outputPath, "tmp"));
+  }
+
+  public static DistributedRowMatrix runJob(Path markovPath, Vector diag, Path 
outputPath, Path tmpPath)
+    throws IOException, ClassNotFoundException, InterruptedException {
+
+    // set up the serialization of the diagonal vector
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(markovPath.toUri(), conf);
+    markovPath = fs.makeQualified(markovPath);
+    outputPath = fs.makeQualified(outputPath);
+    Path vectorOutputPath = new Path(outputPath.getParent(), "vector");
+    VectorCache.save(new IntWritable(Keys.DIAGONAL_CACHE_INDEX), diag, 
vectorOutputPath, conf);
+
+    // set up the job itself
+    Job job = new Job(conf, "VectorMatrixMultiplication");
+    job.setInputFormatClass(SequenceFileInputFormat.class);
+    job.setOutputKeyClass(IntWritable.class);
+    job.setOutputValueClass(VectorWritable.class);
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
+    job.setMapperClass(VectorMatrixMultiplicationMapper.class);
+    job.setNumReduceTasks(0);
+
+    FileInputFormat.addInputPath(job, markovPath);
+    FileOutputFormat.setOutputPath(job, outputPath);
+
+    job.setJarByClass(VectorMatrixMultiplicationJob.class);
+
+    boolean succeeded = job.waitForCompletion(true);
+    if (!succeeded) {
+      throw new IllegalStateException("Job failed!");
+    }
+
+    // build the resulting DRM from the results
+    return new DistributedRowMatrix(outputPath, tmpPath,
+        diag.size(), diag.size());
+  }
+  
+  public static class VectorMatrixMultiplicationMapper
+    extends Mapper<IntWritable, VectorWritable, IntWritable, VectorWritable> {
+    
+    private Vector diagonal;
+    
+    @Override
+    protected void setup(Context context) throws IOException, 
InterruptedException {
+      // read in the diagonal vector from the distributed cache
+      super.setup(context);
+      Configuration config = context.getConfiguration();
+      diagonal = VectorCache.load(config);
+      if (diagonal == null) {
+        throw new IOException("No vector loaded from cache!");
+      }
+      if (!(diagonal instanceof DenseVector)) {
+        diagonal = new DenseVector(diagonal);
+      }
+    }
+    
+    @Override
+    protected void map(IntWritable key, VectorWritable row, Context ctx) 
+      throws IOException, InterruptedException {
+      
+      for (Vector.Element e : row.get().all()) {
+        double dii = Functions.SQRT.apply(diagonal.get(key.get()));
+        double djj = Functions.SQRT.apply(diagonal.get(e.index()));
+        double mij = e.get();
+        e.set(dii * mij * djj);
+      }
+      ctx.write(key, row);
+    }
+    
+    /**
+     * Performs the setup of the Mapper. Used by unit tests.
+     * @param diag
+     */
+    void setup(Vector diag) {
+      this.diagonal = diag;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/spectral/VertexWritable.java
----------------------------------------------------------------------
diff --git 
a/mr/src/main/java/org/apache/mahout/clustering/spectral/VertexWritable.java 
b/mr/src/main/java/org/apache/mahout/clustering/spectral/VertexWritable.java
new file mode 100644
index 0000000..0d70cac
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/clustering/spectral/VertexWritable.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Represents a vertex within the affinity graph for Eigencuts.
+ */
+public class VertexWritable implements Writable {
+  
+  /** the row */
+  private int i;
+  
+  /** the column */
+  private int j;
+  
+  /** the value at this vertex */
+  private double value;
+  
+  /** an extra type delimeter, can probably be null */
+  private String type;
+  
+  public VertexWritable() {
+  }
+
+  public VertexWritable(int i, int j, double v, String t) {
+    this.i = i;
+    this.j = j;
+    this.value = v;
+    this.type = t;
+  }
+  
+  public int getRow() {
+    return i;
+  }
+  
+  public void setRow(int i) {
+    this.i = i;
+  }
+  
+  public int getCol() {
+    return j;
+  }
+  
+  public void setCol(int j) { 
+    this.j = j;
+  }
+  
+  public double getValue() {
+    return value;
+  }
+  
+  public void setValue(double v) {
+    this.value = v;
+  }
+  
+  public String getType() {
+    return type;
+  }
+  
+  public void setType(String t) {
+    this.type = t;
+  }
+  
+  @Override
+  public void readFields(DataInput arg0) throws IOException {
+    this.i = arg0.readInt();
+    this.j = arg0.readInt();
+    this.value = arg0.readDouble();
+    this.type = arg0.readUTF();
+  }
+
+  @Override
+  public void write(DataOutput arg0) throws IOException {
+    arg0.writeInt(i);
+    arg0.writeInt(j);
+    arg0.writeDouble(value);
+    arg0.writeUTF(type);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/spectral/kmeans/EigenSeedGenerator.java
----------------------------------------------------------------------
diff --git 
a/mr/src/main/java/org/apache/mahout/clustering/spectral/kmeans/EigenSeedGenerator.java
 
b/mr/src/main/java/org/apache/mahout/clustering/spectral/kmeans/EigenSeedGenerator.java
new file mode 100644
index 0000000..5f9c1a6
--- /dev/null
+++ 
b/mr/src/main/java/org/apache/mahout/clustering/spectral/kmeans/EigenSeedGenerator.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral.kmeans;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.clustering.iterator.ClusterWritable;
+import org.apache.mahout.clustering.kmeans.Kluster;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+import com.google.common.io.Closeables;
+
+/**
+ * Given an Input Path containing a {@link org.apache.hadoop.io.SequenceFile}, 
select k vectors and write them to the
+ * output file as a {@link org.apache.mahout.clustering.kmeans.Kluster} 
representing the initial centroid to use. The
+ * selection criterion is the rows with max value in that respective column
+ */
+public final class EigenSeedGenerator {
+
+  private static final Logger log = 
LoggerFactory.getLogger(EigenSeedGenerator.class);
+
+  public static final String K = "k";
+
+  private EigenSeedGenerator() {}
+
+  public static Path buildFromEigens(Configuration conf, Path input, Path 
output, int k, DistanceMeasure measure)
+      throws IOException {
+    // delete the output directory
+    FileSystem fs = FileSystem.get(output.toUri(), conf);
+    HadoopUtil.delete(conf, output);
+    Path outFile = new Path(output, "part-eigenSeed");
+    boolean newFile = fs.createNewFile(outFile);
+    if (newFile) {
+      Path inputPathPattern;
+
+      if (fs.getFileStatus(input).isDir()) {
+        inputPathPattern = new Path(input, "*");
+      } else {
+        inputPathPattern = input;
+      }
+
+      FileStatus[] inputFiles = fs.globStatus(inputPathPattern, 
PathFilters.logsCRCFilter());
+      SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, 
outFile, Text.class, ClusterWritable.class);
+      Map<Integer,Double> maxEigens = Maps.newHashMapWithExpectedSize(k); // 
store
+                                                                          // 
max
+                                                                          // 
value
+                                                                          // of
+                                                                          // 
each
+                                                                          // 
column
+      Map<Integer,Text> chosenTexts = Maps.newHashMapWithExpectedSize(k);
+      Map<Integer,ClusterWritable> chosenClusters = 
Maps.newHashMapWithExpectedSize(k);
+
+      for (FileStatus fileStatus : inputFiles) {
+        if (!fileStatus.isDir()) {
+          for (Pair<Writable,VectorWritable> record : new 
SequenceFileIterable<Writable,VectorWritable>(
+              fileStatus.getPath(), true, conf)) {
+            Writable key = record.getFirst();
+            VectorWritable value = record.getSecond();
+
+            for (Vector.Element e : value.get().nonZeroes()) {
+              int index = e.index();
+              double v = Math.abs(e.get());
+
+              if (!maxEigens.containsKey(index) || v > maxEigens.get(index)) {
+                maxEigens.put(index, v);
+                Text newText = new Text(key.toString());
+                chosenTexts.put(index, newText);
+                Kluster newCluster = new Kluster(value.get(), index, measure);
+                newCluster.observe(value.get(), 1);
+                ClusterWritable clusterWritable = new ClusterWritable();
+                clusterWritable.setValue(newCluster);
+                chosenClusters.put(index, clusterWritable);
+              }
+            }
+          }
+        }
+      }
+
+      try {
+        for (Integer key : maxEigens.keySet()) {
+          writer.append(chosenTexts.get(key), chosenClusters.get(key));
+        }
+        log.info("EigenSeedGenerator:: Wrote {} Klusters to {}", 
chosenTexts.size(), outFile);
+      } finally {
+        Closeables.close(writer, false);
+      }
+    }
+
+    return outFile;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java
----------------------------------------------------------------------
diff --git 
a/mr/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java
 
b/mr/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java
new file mode 100644
index 0000000..427de91
--- /dev/null
+++ 
b/mr/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.spectral.kmeans;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.clustering.Cluster;
+import org.apache.mahout.clustering.classify.WeightedVectorWritable;
+import org.apache.mahout.clustering.kmeans.KMeansDriver;
+import org.apache.mahout.clustering.spectral.AffinityMatrixInputJob;
+import org.apache.mahout.clustering.spectral.MatrixDiagonalizeJob;
+import org.apache.mahout.clustering.spectral.UnitVectorizerJob;
+import org.apache.mahout.clustering.spectral.VectorMatrixMultiplicationJob;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.ClassUtils;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.hadoop.DistributedRowMatrix;
+import org.apache.mahout.math.hadoop.stochasticsvd.SSVDSolver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Performs spectral k-means clustering on the top k eigenvectors of the input 
affinity matrix.
+ */
+public class SpectralKMeansDriver extends AbstractJob {
+  private static final Logger log = 
LoggerFactory.getLogger(SpectralKMeansDriver.class);
+
+  public static final int REDUCERS = 10;
+  public static final int BLOCKHEIGHT = 30000;
+  public static final int OVERSAMPLING = 15;
+  public static final int POWERITERS = 0;
+
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new SpectralKMeansDriver(), args);
+  }
+
+  @Override
+  public int run(String[] arg0) throws Exception {
+
+    Configuration conf = getConf();
+    addInputOption();
+    addOutputOption();
+    addOption("dimensions", "d", "Square dimensions of affinity matrix", true);
+    addOption("clusters", "k", "Number of clusters and top eigenvectors", 
true);
+    addOption(DefaultOptionCreator.distanceMeasureOption().create());
+    addOption(DefaultOptionCreator.convergenceOption().create());
+    addOption(DefaultOptionCreator.maxIterationsOption().create());
+    addOption(DefaultOptionCreator.overwriteOption().create());
+    addFlag("usessvd", "ssvd", "Uses SSVD as the eigensolver. Default is the 
Lanczos solver.");
+    addOption("reduceTasks", "t", "Number of reducers for SSVD", 
String.valueOf(REDUCERS));
+    addOption("outerProdBlockHeight", "oh", "Block height of outer products 
for SSVD", String.valueOf(BLOCKHEIGHT));
+    addOption("oversampling", "p", "Oversampling parameter for SSVD", 
String.valueOf(OVERSAMPLING));
+    addOption("powerIter", "q", "Additional power iterations for SSVD", 
String.valueOf(POWERITERS));
+
+    Map<String, List<String>> parsedArgs = parseArguments(arg0);
+    if (parsedArgs == null) {
+      return 0;
+    }
+
+    Path input = getInputPath();
+    Path output = getOutputPath();
+    if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
+      HadoopUtil.delete(conf, getTempPath());
+      HadoopUtil.delete(conf, getOutputPath());
+    }
+    int numDims = Integer.parseInt(getOption("dimensions"));
+    int clusters = Integer.parseInt(getOption("clusters"));
+    String measureClass = 
getOption(DefaultOptionCreator.DISTANCE_MEASURE_OPTION);
+    DistanceMeasure measure = ClassUtils.instantiateAs(measureClass, 
DistanceMeasure.class);
+    double convergenceDelta = 
Double.parseDouble(getOption(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION));
+    int maxIterations = 
Integer.parseInt(getOption(DefaultOptionCreator.MAX_ITERATIONS_OPTION));
+
+    Path tempdir = new Path(getOption("tempDir"));
+    int reducers = Integer.parseInt(getOption("reduceTasks"));
+    int blockheight = Integer.parseInt(getOption("outerProdBlockHeight"));
+    int oversampling = Integer.parseInt(getOption("oversampling"));
+    int poweriters = Integer.parseInt(getOption("powerIter"));
+    run(conf, input, output, numDims, clusters, measure, convergenceDelta, 
maxIterations, tempdir, reducers,
+        blockheight, oversampling, poweriters);
+
+    return 0;
+  }
+
+  public static void run(Configuration conf, Path input, Path output, int 
numDims, int clusters,
+                         DistanceMeasure measure, double convergenceDelta, int 
maxIterations, Path tempDir)
+      throws IOException, InterruptedException, ClassNotFoundException {
+    run(conf, input, output, numDims, clusters, measure, convergenceDelta, 
maxIterations, tempDir, REDUCERS,
+        BLOCKHEIGHT, OVERSAMPLING, POWERITERS);
+  }
+
+  /**
+   * Run the Spectral KMeans clustering on the supplied arguments
+   *
+   * @param conf
+   *          the Configuration to be used
+   * @param input
+   *          the Path to the input tuples directory
+   * @param output
+   *          the Path to the output directory
+   * @param numDims
+   *          the int number of dimensions of the affinity matrix
+   * @param clusters
+   *          the int number of eigenvectors and thus clusters to produce
+   * @param measure
+   *          the DistanceMeasure for the k-Means calculations
+   * @param convergenceDelta
+   *          the double convergence delta for the k-Means calculations
+   * @param maxIterations
+   *          the int maximum number of iterations for the k-Means calculations
+   * @param tempDir
+   *          Temporary directory for intermediate calculations
+   * @param numReducers
+   *          Number of reducers
+   * @param blockHeight
+   * @param oversampling
+   * @param poweriters
+   */
+  public static void run(Configuration conf, Path input, Path output, int 
numDims, int clusters,
+                         DistanceMeasure measure, double convergenceDelta, int 
maxIterations, Path tempDir,
+                         int numReducers, int blockHeight, int oversampling, 
int poweriters)
+      throws IOException, InterruptedException, ClassNotFoundException {
+
+    HadoopUtil.delete(conf, tempDir);
+    Path outputCalc = new Path(tempDir, "calculations");
+    Path outputTmp = new Path(tempDir, "temporary");
+
+    // Take in the raw CSV text file and split it ourselves,
+    // creating our own SequenceFiles for the matrices to read later
+    // (similar to the style of syntheticcontrol.canopy.InputMapper)
+    Path affSeqFiles = new Path(outputCalc, "seqfile");
+    AffinityMatrixInputJob.runJob(input, affSeqFiles, numDims, numDims);
+
+    // Construct the affinity matrix using the newly-created sequence files
+    DistributedRowMatrix A = new DistributedRowMatrix(affSeqFiles, new 
Path(outputTmp, "afftmp"), numDims, numDims);
+
+    Configuration depConf = new Configuration(conf);
+    A.setConf(depConf);
+
+    // Construct the diagonal matrix D (represented as a vector)
+    Vector D = MatrixDiagonalizeJob.runJob(affSeqFiles, numDims);
+
+    // Calculate the normalized Laplacian of the form: L = D^(-0.5)AD^(-0.5)
+    DistributedRowMatrix L = VectorMatrixMultiplicationJob.runJob(affSeqFiles, 
D, new Path(outputCalc, "laplacian"),
+        new Path(outputCalc, outputCalc));
+    L.setConf(depConf);
+
+    Path data;
+
+    // SSVD requires an array of Paths to function. So we pass in an array of 
length one
+    Path[] LPath = new Path[1];
+    LPath[0] = L.getRowPath();
+
+    Path SSVDout = new Path(outputCalc, "SSVD");
+
+    SSVDSolver solveIt = new SSVDSolver(depConf, LPath, SSVDout, blockHeight, 
clusters, oversampling, numReducers);
+
+    solveIt.setComputeV(false);
+    solveIt.setComputeU(true);
+    solveIt.setOverwrite(true);
+    solveIt.setQ(poweriters);
+    // solveIt.setBroadcast(false);
+    solveIt.run();
+    data = new Path(solveIt.getUPath());
+
+    // Normalize the rows of Wt to unit length
+    // normalize is important because it reduces the occurrence of two unique 
clusters combining into one
+    Path unitVectors = new Path(outputCalc, "unitvectors");
+
+    UnitVectorizerJob.runJob(data, unitVectors);
+
+    DistributedRowMatrix Wt = new DistributedRowMatrix(unitVectors, new 
Path(unitVectors, "tmp"), clusters, numDims);
+    Wt.setConf(depConf);
+    data = Wt.getRowPath();
+
+    // Generate initial clusters using EigenSeedGenerator which picks rows as 
centroids if that row contains max
+    // eigen value in that column
+    Path initialclusters = EigenSeedGenerator.buildFromEigens(conf, data,
+        new Path(output, Cluster.INITIAL_CLUSTERS_DIR), clusters, measure);
+
+    // Run the KMeansDriver
+    Path answer = new Path(output, "kmeans_out");
+    KMeansDriver.run(conf, data, initialclusters, answer, convergenceDelta, 
maxIterations, true, 0.0, false);
+
+    // Restore name to id mapping and read through the cluster assignments
+    Path mappingPath = new Path(new Path(conf.get("hadoop.tmp.dir")), 
"generic_input_mapping");
+    List<String> mapping = new ArrayList<>();
+    FileSystem fs = FileSystem.get(mappingPath.toUri(), conf);
+    if (fs.exists(mappingPath)) {
+      SequenceFile.Reader reader = new SequenceFile.Reader(fs, mappingPath, 
conf);
+      Text mappingValue = new Text();
+      IntWritable mappingIndex = new IntWritable();
+      while (reader.next(mappingIndex, mappingValue)) {
+        String s = mappingValue.toString();
+        mapping.add(s);
+      }
+      HadoopUtil.delete(conf, mappingPath);
+    } else {
+      log.warn("generic input mapping file not found!");
+    }
+
+    Path clusteredPointsPath = new Path(answer, "clusteredPoints");
+    Path inputPath = new Path(clusteredPointsPath, "part-m-00000");
+    int id = 0;
+    for (Pair<IntWritable, WeightedVectorWritable> record :
+        new SequenceFileIterable<IntWritable, 
WeightedVectorWritable>(inputPath, conf)) {
+      if (!mapping.isEmpty()) {
+        log.info("{}: {}", mapping.get(id++), record.getFirst().get());
+      } else {
+        log.info("{}: {}", id++, record.getFirst().get());
+      }
+    }
+  }
+}

Reply via email to