http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/lda/cvb/TopicModel.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/clustering/lda/cvb/TopicModel.java b/mr/src/main/java/org/apache/mahout/clustering/lda/cvb/TopicModel.java new file mode 100644 index 0000000..7b7816c --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/clustering/lda/cvb/TopicModel.java @@ -0,0 +1,513 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.mahout.clustering.lda.cvb; + +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.mahout.common.Pair; +import org.apache.mahout.common.RandomUtils; +import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable; +import org.apache.mahout.math.DenseMatrix; +import org.apache.mahout.math.DenseVector; +import org.apache.mahout.math.DistributedRowMatrixWriter; +import org.apache.mahout.math.Matrix; +import org.apache.mahout.math.MatrixSlice; +import org.apache.mahout.math.SequentialAccessSparseVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.Vector.Element; +import org.apache.mahout.math.VectorWritable; +import org.apache.mahout.math.function.Functions; +import org.apache.mahout.math.stats.Sampler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * Thin wrapper around a {@link Matrix} of counts of occurrences of (topic, term) pairs. Dividing + * {code topicTermCount.viewRow(topic).get(term)} by the sum over the values for all terms in that + * row yields p(term | topic). Instead dividing it by all topic columns for that term yields + * p(topic | term). + * + * Multithreading is enabled for the {@code update(Matrix)} method: this method is async, and + * merely submits the matrix to a work queue. When all work has been submitted, + * {@code awaitTermination()} should be called, which will block until updates have been + * accumulated. + */ +public class TopicModel implements Configurable, Iterable<MatrixSlice> { + + private static final Logger log = LoggerFactory.getLogger(TopicModel.class); + + private final String[] dictionary; + private final Matrix topicTermCounts; + private final Vector topicSums; + private final int numTopics; + private final int numTerms; + private final double eta; + private final double alpha; + + private Configuration conf; + + private final Sampler sampler; + private final int numThreads; + private ThreadPoolExecutor threadPool; + private Updater[] updaters; + + public int getNumTerms() { + return numTerms; + } + + public int getNumTopics() { + return numTopics; + } + + public TopicModel(int numTopics, int numTerms, double eta, double alpha, String[] dictionary, + double modelWeight) { + this(numTopics, numTerms, eta, alpha, null, dictionary, 1, modelWeight); + } + + public TopicModel(Configuration conf, double eta, double alpha, + String[] dictionary, int numThreads, double modelWeight, Path... modelpath) throws IOException { + this(loadModel(conf, modelpath), eta, alpha, dictionary, numThreads, modelWeight); + } + + public TopicModel(int numTopics, int numTerms, double eta, double alpha, String[] dictionary, + int numThreads, double modelWeight) { + this(new DenseMatrix(numTopics, numTerms), new DenseVector(numTopics), eta, alpha, dictionary, + numThreads, modelWeight); + } + + public TopicModel(int numTopics, int numTerms, double eta, double alpha, Random random, + String[] dictionary, int numThreads, double modelWeight) { + this(randomMatrix(numTopics, numTerms, random), eta, alpha, dictionary, numThreads, modelWeight); + } + + private TopicModel(Pair<Matrix, Vector> model, double eta, double alpha, String[] dict, + int numThreads, double modelWeight) { + this(model.getFirst(), model.getSecond(), eta, alpha, dict, numThreads, modelWeight); + } + + public TopicModel(Matrix topicTermCounts, Vector topicSums, double eta, double alpha, + String[] dictionary, double modelWeight) { + this(topicTermCounts, topicSums, eta, alpha, dictionary, 1, modelWeight); + } + + public TopicModel(Matrix topicTermCounts, double eta, double alpha, String[] dictionary, + int numThreads, double modelWeight) { + this(topicTermCounts, viewRowSums(topicTermCounts), + eta, alpha, dictionary, numThreads, modelWeight); + } + + public TopicModel(Matrix topicTermCounts, Vector topicSums, double eta, double alpha, + String[] dictionary, int numThreads, double modelWeight) { + this.dictionary = dictionary; + this.topicTermCounts = topicTermCounts; + this.topicSums = topicSums; + this.numTopics = topicSums.size(); + this.numTerms = topicTermCounts.numCols(); + this.eta = eta; + this.alpha = alpha; + this.sampler = new Sampler(RandomUtils.getRandom()); + this.numThreads = numThreads; + if (modelWeight != 1) { + topicSums.assign(Functions.mult(modelWeight)); + for (int x = 0; x < numTopics; x++) { + topicTermCounts.viewRow(x).assign(Functions.mult(modelWeight)); + } + } + initializeThreadPool(); + } + + private static Vector viewRowSums(Matrix m) { + Vector v = new DenseVector(m.numRows()); + for (MatrixSlice slice : m) { + v.set(slice.index(), slice.vector().norm(1)); + } + return v; + } + + private synchronized void initializeThreadPool() { + if (threadPool != null) { + threadPool.shutdown(); + try { + threadPool.awaitTermination(100, TimeUnit.SECONDS); + } catch (InterruptedException e) { + log.error("Could not terminate all threads for TopicModel in time.", e); + } + } + threadPool = new ThreadPoolExecutor(numThreads, numThreads, 0, TimeUnit.SECONDS, + new ArrayBlockingQueue<Runnable>(numThreads * 10)); + threadPool.allowCoreThreadTimeOut(false); + updaters = new Updater[numThreads]; + for (int i = 0; i < numThreads; i++) { + updaters[i] = new Updater(); + threadPool.submit(updaters[i]); + } + } + + Matrix topicTermCounts() { + return topicTermCounts; + } + + @Override + public Iterator<MatrixSlice> iterator() { + return topicTermCounts.iterateAll(); + } + + public Vector topicSums() { + return topicSums; + } + + private static Pair<Matrix,Vector> randomMatrix(int numTopics, int numTerms, Random random) { + Matrix topicTermCounts = new DenseMatrix(numTopics, numTerms); + Vector topicSums = new DenseVector(numTopics); + if (random != null) { + for (int x = 0; x < numTopics; x++) { + for (int term = 0; term < numTerms; term++) { + topicTermCounts.viewRow(x).set(term, random.nextDouble()); + } + } + } + for (int x = 0; x < numTopics; x++) { + topicSums.set(x, random == null ? 1.0 : topicTermCounts.viewRow(x).norm(1)); + } + return Pair.of(topicTermCounts, topicSums); + } + + public static Pair<Matrix, Vector> loadModel(Configuration conf, Path... modelPaths) + throws IOException { + int numTopics = -1; + int numTerms = -1; + List<Pair<Integer, Vector>> rows = Lists.newArrayList(); + for (Path modelPath : modelPaths) { + for (Pair<IntWritable, VectorWritable> row + : new SequenceFileIterable<IntWritable, VectorWritable>(modelPath, true, conf)) { + rows.add(Pair.of(row.getFirst().get(), row.getSecond().get())); + numTopics = Math.max(numTopics, row.getFirst().get()); + if (numTerms < 0) { + numTerms = row.getSecond().get().size(); + } + } + } + if (rows.isEmpty()) { + throw new IOException(Arrays.toString(modelPaths) + " have no vectors in it"); + } + numTopics++; + Matrix model = new DenseMatrix(numTopics, numTerms); + Vector topicSums = new DenseVector(numTopics); + for (Pair<Integer, Vector> pair : rows) { + model.viewRow(pair.getFirst()).assign(pair.getSecond()); + topicSums.set(pair.getFirst(), pair.getSecond().norm(1)); + } + return Pair.of(model, topicSums); + } + + // NOTE: this is purely for debug purposes. It is not performant to "toString()" a real model + @Override + public String toString() { + StringBuilder buf = new StringBuilder(); + for (int x = 0; x < numTopics; x++) { + String v = dictionary != null + ? vectorToSortedString(topicTermCounts.viewRow(x).normalize(1), dictionary) + : topicTermCounts.viewRow(x).asFormatString(); + buf.append(v).append('\n'); + } + return buf.toString(); + } + + public int sampleTerm(Vector topicDistribution) { + return sampler.sample(topicTermCounts.viewRow(sampler.sample(topicDistribution))); + } + + public int sampleTerm(int topic) { + return sampler.sample(topicTermCounts.viewRow(topic)); + } + + public synchronized void reset() { + for (int x = 0; x < numTopics; x++) { + topicTermCounts.assignRow(x, new SequentialAccessSparseVector(numTerms)); + } + topicSums.assign(1.0); + if (threadPool.isTerminated()) { + initializeThreadPool(); + } + } + + public synchronized void stop() { + for (Updater updater : updaters) { + updater.shutdown(); + } + threadPool.shutdown(); + try { + if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) { + log.warn("Threadpool timed out on await termination - jobs still running!"); + } + } catch (InterruptedException e) { + log.error("Interrupted shutting down!", e); + } + } + + public void renormalize() { + for (int x = 0; x < numTopics; x++) { + topicTermCounts.assignRow(x, topicTermCounts.viewRow(x).normalize(1)); + topicSums.assign(1.0); + } + } + + public void trainDocTopicModel(Vector original, Vector topics, Matrix docTopicModel) { + // first calculate p(topic|term,document) for all terms in original, and all topics, + // using p(term|topic) and p(topic|doc) + pTopicGivenTerm(original, topics, docTopicModel); + normalizeByTopic(docTopicModel); + // now multiply, term-by-term, by the document, to get the weighted distribution of + // term-topic pairs from this document. + for (Element e : original.nonZeroes()) { + for (int x = 0; x < numTopics; x++) { + Vector docTopicModelRow = docTopicModel.viewRow(x); + docTopicModelRow.setQuick(e.index(), docTopicModelRow.getQuick(e.index()) * e.get()); + } + } + // now recalculate \(p(topic|doc)\) by summing contributions from all of pTopicGivenTerm + topics.assign(0.0); + for (int x = 0; x < numTopics; x++) { + topics.set(x, docTopicModel.viewRow(x).norm(1)); + } + // now renormalize so that \(sum_x(p(x|doc))\) = 1 + topics.assign(Functions.mult(1 / topics.norm(1))); + } + + public Vector infer(Vector original, Vector docTopics) { + Vector pTerm = original.like(); + for (Element e : original.nonZeroes()) { + int term = e.index(); + // p(a) = sum_x (p(a|x) * p(x|i)) + double pA = 0; + for (int x = 0; x < numTopics; x++) { + pA += (topicTermCounts.viewRow(x).get(term) / topicSums.get(x)) * docTopics.get(x); + } + pTerm.set(term, pA); + } + return pTerm; + } + + public void update(Matrix docTopicCounts) { + for (int x = 0; x < numTopics; x++) { + updaters[x % updaters.length].update(x, docTopicCounts.viewRow(x)); + } + } + + public void updateTopic(int topic, Vector docTopicCounts) { + topicTermCounts.viewRow(topic).assign(docTopicCounts, Functions.PLUS); + topicSums.set(topic, topicSums.get(topic) + docTopicCounts.norm(1)); + } + + public void update(int termId, Vector topicCounts) { + for (int x = 0; x < numTopics; x++) { + Vector v = topicTermCounts.viewRow(x); + v.set(termId, v.get(termId) + topicCounts.get(x)); + } + topicSums.assign(topicCounts, Functions.PLUS); + } + + public void persist(Path outputDir, boolean overwrite) throws IOException { + FileSystem fs = outputDir.getFileSystem(conf); + if (overwrite) { + fs.delete(outputDir, true); // CHECK second arg + } + DistributedRowMatrixWriter.write(outputDir, conf, topicTermCounts); + } + + /** + * Computes {@code \(p(topic x | term a, document i)\)} distributions given input document {@code i}. + * {@code \(pTGT[x][a]\)} is the (un-normalized) {@code \(p(x|a,i)\)}, or if docTopics is {@code null}, + * {@code \(p(a|x)\)} (also un-normalized). + * + * @param document doc-term vector encoding {@code \(w(term a|document i)\)}. + * @param docTopics {@code docTopics[x]} is the overall weight of topic {@code x} in given + * document. If {@code null}, a topic weight of {@code 1.0} is used for all topics. + * @param termTopicDist storage for output {@code \(p(x|a,i)\)} distributions. + */ + private void pTopicGivenTerm(Vector document, Vector docTopics, Matrix termTopicDist) { + // for each topic x + for (int x = 0; x < numTopics; x++) { + // get p(topic x | document i), or 1.0 if docTopics is null + double topicWeight = docTopics == null ? 1.0 : docTopics.get(x); + // get w(term a | topic x) + Vector topicTermRow = topicTermCounts.viewRow(x); + // get \sum_a w(term a | topic x) + double topicSum = topicSums.get(x); + // get p(topic x | term a) distribution to update + Vector termTopicRow = termTopicDist.viewRow(x); + + // for each term a in document i with non-zero weight + for (Element e : document.nonZeroes()) { + int termIndex = e.index(); + + // calc un-normalized p(topic x | term a, document i) + double termTopicLikelihood = (topicTermRow.get(termIndex) + eta) * (topicWeight + alpha) + / (topicSum + eta * numTerms); + termTopicRow.set(termIndex, termTopicLikelihood); + } + } + } + + /** + * \(sum_x sum_a (c_ai * log(p(x|i) * p(a|x)))\) + */ + public double perplexity(Vector document, Vector docTopics) { + double perplexity = 0; + double norm = docTopics.norm(1) + (docTopics.size() * alpha); + for (Element e : document.nonZeroes()) { + int term = e.index(); + double prob = 0; + for (int x = 0; x < numTopics; x++) { + double d = (docTopics.get(x) + alpha) / norm; + double p = d * (topicTermCounts.viewRow(x).get(term) + eta) + / (topicSums.get(x) + eta * numTerms); + prob += p; + } + perplexity += e.get() * Math.log(prob); + } + return -perplexity; + } + + private void normalizeByTopic(Matrix perTopicSparseDistributions) { + // then make sure that each of these is properly normalized by topic: sum_x(p(x|t,d)) = 1 + for (Element e : perTopicSparseDistributions.viewRow(0).nonZeroes()) { + int a = e.index(); + double sum = 0; + for (int x = 0; x < numTopics; x++) { + sum += perTopicSparseDistributions.viewRow(x).get(a); + } + for (int x = 0; x < numTopics; x++) { + perTopicSparseDistributions.viewRow(x).set(a, + perTopicSparseDistributions.viewRow(x).get(a) / sum); + } + } + } + + public static String vectorToSortedString(Vector vector, String[] dictionary) { + List<Pair<String,Double>> vectorValues = Lists.newArrayListWithCapacity(vector.getNumNondefaultElements()); + for (Element e : vector.nonZeroes()) { + vectorValues.add(Pair.of(dictionary != null ? dictionary[e.index()] : String.valueOf(e.index()), + e.get())); + } + Collections.sort(vectorValues, new Comparator<Pair<String, Double>>() { + @Override public int compare(Pair<String, Double> x, Pair<String, Double> y) { + return y.getSecond().compareTo(x.getSecond()); + } + }); + Iterator<Pair<String,Double>> listIt = vectorValues.iterator(); + StringBuilder bldr = new StringBuilder(2048); + bldr.append('{'); + int i = 0; + while (listIt.hasNext() && i < 25) { + i++; + Pair<String,Double> p = listIt.next(); + bldr.append(p.getFirst()); + bldr.append(':'); + bldr.append(p.getSecond()); + bldr.append(','); + } + if (bldr.length() > 1) { + bldr.setCharAt(bldr.length() - 1, '}'); + } + return bldr.toString(); + } + + @Override + public void setConf(Configuration configuration) { + this.conf = configuration; + } + + @Override + public Configuration getConf() { + return conf; + } + + private final class Updater implements Runnable { + private final ArrayBlockingQueue<Pair<Integer, Vector>> queue = + new ArrayBlockingQueue<>(100); + private boolean shutdown = false; + private boolean shutdownComplete = false; + + public void shutdown() { + try { + synchronized (this) { + while (!shutdownComplete) { + shutdown = true; + wait(10000L); // Arbitrarily, wait 10 seconds rather than forever for this + } + } + } catch (InterruptedException e) { + log.warn("Interrupted waiting to shutdown() : ", e); + } + } + + public boolean update(int topic, Vector v) { + if (shutdown) { // maybe don't do this? + throw new IllegalStateException("In SHUTDOWN state: cannot submit tasks"); + } + while (true) { // keep trying if interrupted + try { + // start async operation by submitting to the queue + queue.put(Pair.of(topic, v)); + // return once you got access to the queue + return true; + } catch (InterruptedException e) { + log.warn("Interrupted trying to queue update:", e); + } + } + } + + @Override + public void run() { + while (!shutdown) { + try { + Pair<Integer, Vector> pair = queue.poll(1, TimeUnit.SECONDS); + if (pair != null) { + updateTopic(pair.getFirst(), pair.getSecond()); + } + } catch (InterruptedException e) { + log.warn("Interrupted waiting to poll for update", e); + } + } + // in shutdown mode, finish remaining tasks! + for (Pair<Integer, Vector> pair : queue) { + updateTopic(pair.getFirst(), pair.getSecond()); + } + synchronized (this) { + shutdownComplete = true; + notifyAll(); + } + } + } + +}
http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/package-info.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/clustering/package-info.java b/mr/src/main/java/org/apache/mahout/clustering/package-info.java new file mode 100644 index 0000000..9926b91 --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/clustering/package-info.java @@ -0,0 +1,13 @@ +/** + * <p></p>This package provides several clustering algorithm implementations. Clustering usually groups a set of + * objects into groups of similar items. The definition of similarity usually is up to you - for text documents, + * cosine-distance/-similarity is recommended. Mahout also features other types of distance measure like + * Euclidean distance.</p> + * + * <p></p>Input of each clustering algorithm is a set of vectors representing your items. For texts in general these are + * <a href="http://en.wikipedia.org/wiki/TFIDF">TFIDF</a> or + * <a href="http://en.wikipedia.org/wiki/Bag_of_words">Bag of words</a> representations of the documents.</p> + * + * <p>Output of each clustering algorithm is either a hard or soft assignment of items to clusters.</p> + */ +package org.apache.mahout.clustering; http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/spectral/AffinityMatrixInputJob.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/clustering/spectral/AffinityMatrixInputJob.java b/mr/src/main/java/org/apache/mahout/clustering/spectral/AffinityMatrixInputJob.java new file mode 100644 index 0000000..aa12b9e --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/clustering/spectral/AffinityMatrixInputJob.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.spectral; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.mahout.common.HadoopUtil; +import org.apache.mahout.math.VectorWritable; +import org.apache.mahout.math.hadoop.DistributedRowMatrix; + +public final class AffinityMatrixInputJob { + + private AffinityMatrixInputJob() { + } + + /** + * Initializes and executes the job of reading the documents containing + * the data of the affinity matrix in (x_i, x_j, value) format. + */ + public static void runJob(Path input, Path output, int rows, int cols) + throws IOException, InterruptedException, ClassNotFoundException { + Configuration conf = new Configuration(); + HadoopUtil.delete(conf, output); + + conf.setInt(Keys.AFFINITY_DIMENSIONS, rows); + Job job = new Job(conf, "AffinityMatrixInputJob: " + input + " -> M/R -> " + output); + + job.setMapOutputKeyClass(IntWritable.class); + job.setMapOutputValueClass(DistributedRowMatrix.MatrixEntryWritable.class); + job.setOutputKeyClass(IntWritable.class); + job.setOutputValueClass(VectorWritable.class); + job.setOutputFormatClass(SequenceFileOutputFormat.class); + job.setMapperClass(AffinityMatrixInputMapper.class); + job.setReducerClass(AffinityMatrixInputReducer.class); + + FileInputFormat.addInputPath(job, input); + FileOutputFormat.setOutputPath(job, output); + + job.setJarByClass(AffinityMatrixInputJob.class); + + boolean succeeded = job.waitForCompletion(true); + if (!succeeded) { + throw new IllegalStateException("Job failed!"); + } + } + + /** + * A transparent wrapper for the above method which handles the tedious tasks + * of setting and retrieving system Paths. Hands back a fully-populated + * and initialized DistributedRowMatrix. + */ + public static DistributedRowMatrix runJob(Path input, Path output, int dimensions) + throws IOException, InterruptedException, ClassNotFoundException { + Path seqFiles = new Path(output, "seqfiles-" + (System.nanoTime() & 0xFF)); + runJob(input, seqFiles, dimensions, dimensions); + DistributedRowMatrix a = new DistributedRowMatrix(seqFiles, + new Path(seqFiles, "seqtmp-" + (System.nanoTime() & 0xFF)), + dimensions, dimensions); + a.setConf(new Configuration()); + return a; + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/spectral/AffinityMatrixInputMapper.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/clustering/spectral/AffinityMatrixInputMapper.java b/mr/src/main/java/org/apache/mahout/clustering/spectral/AffinityMatrixInputMapper.java new file mode 100644 index 0000000..30d2404 --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/clustering/spectral/AffinityMatrixInputMapper.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.spectral; + +import java.io.IOException; +import java.util.regex.Pattern; + +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.mahout.math.hadoop.DistributedRowMatrix; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * <p>Handles reading the files representing the affinity matrix. Since the affinity + * matrix is representative of a graph, each line in all the files should + * take the form:</p> + * + * {@code i,j,value} + * + * <p>where {@code i} and {@code j} are the {@code i}th and + * {@code j} data points in the entire set, and {@code value} + * represents some measurement of their relative absolute magnitudes. This + * is, simply, a method for representing a graph textually. + */ +public class AffinityMatrixInputMapper + extends Mapper<LongWritable, Text, IntWritable, DistributedRowMatrix.MatrixEntryWritable> { + + private static final Logger log = LoggerFactory.getLogger(AffinityMatrixInputMapper.class); + + private static final Pattern COMMA_PATTERN = Pattern.compile(","); + + @Override + protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { + + String[] elements = COMMA_PATTERN.split(value.toString()); + log.debug("(DEBUG - MAP) Key[{}], Value[{}]", key.get(), value); + + // enforce well-formed textual representation of the graph + if (elements.length != 3) { + throw new IOException("Expected input of length 3, received " + + elements.length + ". Please make sure you adhere to " + + "the structure of (i,j,value) for representing a graph in text. " + + "Input line was: '" + value + "'."); + } + if (elements[0].isEmpty() || elements[1].isEmpty() || elements[2].isEmpty()) { + throw new IOException("Found an element of 0 length. Please be sure you adhere to the structure of " + + "(i,j,value) for representing a graph in text."); + } + + // parse the line of text into a DistributedRowMatrix entry, + // making the row (elements[0]) the key to the Reducer, and + // setting the column (elements[1]) in the entry itself + DistributedRowMatrix.MatrixEntryWritable toAdd = new DistributedRowMatrix.MatrixEntryWritable(); + IntWritable row = new IntWritable(Integer.valueOf(elements[0])); + toAdd.setRow(-1); // already set as the Reducer's key + toAdd.setCol(Integer.valueOf(elements[1])); + toAdd.setVal(Double.valueOf(elements[2])); + context.write(row, toAdd); + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/spectral/AffinityMatrixInputReducer.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/clustering/spectral/AffinityMatrixInputReducer.java b/mr/src/main/java/org/apache/mahout/clustering/spectral/AffinityMatrixInputReducer.java new file mode 100644 index 0000000..d892969 --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/clustering/spectral/AffinityMatrixInputReducer.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.spectral; + +import java.io.IOException; + +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.mahout.math.RandomAccessSparseVector; +import org.apache.mahout.math.SequentialAccessSparseVector; +import org.apache.mahout.math.VectorWritable; +import org.apache.mahout.math.hadoop.DistributedRowMatrix; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Tasked with taking each DistributedRowMatrix entry and collecting them + * into vectors corresponding to rows. The input and output keys are the same, + * corresponding to the row in the ensuing matrix. The matrix entries are + * entered into a vector according to the column to which they belong, and + * the vector is then given the key corresponding to its row. + */ +public class AffinityMatrixInputReducer + extends Reducer<IntWritable, DistributedRowMatrix.MatrixEntryWritable, IntWritable, VectorWritable> { + + private static final Logger log = LoggerFactory.getLogger(AffinityMatrixInputReducer.class); + + @Override + protected void reduce(IntWritable row, Iterable<DistributedRowMatrix.MatrixEntryWritable> values, Context context) + throws IOException, InterruptedException { + int size = context.getConfiguration().getInt(Keys.AFFINITY_DIMENSIONS, Integer.MAX_VALUE); + RandomAccessSparseVector out = new RandomAccessSparseVector(size, 100); + + for (DistributedRowMatrix.MatrixEntryWritable element : values) { + out.setQuick(element.getCol(), element.getVal()); + if (log.isDebugEnabled()) { + log.debug("(DEBUG - REDUCE) Row[{}], Column[{}], Value[{}]", + row.get(), element.getCol(), element.getVal()); + } + } + SequentialAccessSparseVector output = new SequentialAccessSparseVector(out); + context.write(row, new VectorWritable(output)); + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/spectral/IntDoublePairWritable.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/clustering/spectral/IntDoublePairWritable.java b/mr/src/main/java/org/apache/mahout/clustering/spectral/IntDoublePairWritable.java new file mode 100644 index 0000000..593cc58 --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/clustering/spectral/IntDoublePairWritable.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.spectral; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Writable; + +/** + * This class is a Writable implementation of the mahout.common.Pair + * generic class. Since the generic types would also themselves have to + * implement Writable, it made more sense to create a more specialized + * version of the class altogether. + * + * In essence, this can be treated as a single Vector Element. + */ +public class IntDoublePairWritable implements Writable { + + private int key; + private double value; + + public IntDoublePairWritable() { + } + + public IntDoublePairWritable(int k, double v) { + this.key = k; + this.value = v; + } + + public void setKey(int k) { + this.key = k; + } + + public void setValue(double v) { + this.value = v; + } + + @Override + public void readFields(DataInput in) throws IOException { + this.key = in.readInt(); + this.value = in.readDouble(); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(key); + out.writeDouble(value); + } + + public int getKey() { + return key; + } + + public double getValue() { + return value; + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/spectral/Keys.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/clustering/spectral/Keys.java b/mr/src/main/java/org/apache/mahout/clustering/spectral/Keys.java new file mode 100644 index 0000000..268a365 --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/clustering/spectral/Keys.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.spectral; + +public class Keys { + + /** + * Sets the SequenceFile index for the diagonal matrix. + */ + public static final int DIAGONAL_CACHE_INDEX = 1; + + public static final String AFFINITY_DIMENSIONS = "org.apache.mahout.clustering.spectral.common.affinitydimensions"; + + private Keys() {} + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/spectral/MatrixDiagonalizeJob.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/clustering/spectral/MatrixDiagonalizeJob.java b/mr/src/main/java/org/apache/mahout/clustering/spectral/MatrixDiagonalizeJob.java new file mode 100644 index 0000000..f245f99 --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/clustering/spectral/MatrixDiagonalizeJob.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.spectral; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.mahout.common.HadoopUtil; +import org.apache.mahout.math.DenseVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; + +/** + * Given a matrix, this job returns a vector whose i_th element is the + * sum of all the elements in the i_th row of the original matrix. + */ +public final class MatrixDiagonalizeJob { + + private MatrixDiagonalizeJob() { + } + + public static Vector runJob(Path affInput, int dimensions) + throws IOException, ClassNotFoundException, InterruptedException { + + // set up all the job tasks + Configuration conf = new Configuration(); + Path diagOutput = new Path(affInput.getParent(), "diagonal"); + HadoopUtil.delete(conf, diagOutput); + conf.setInt(Keys.AFFINITY_DIMENSIONS, dimensions); + Job job = new Job(conf, "MatrixDiagonalizeJob"); + + job.setInputFormatClass(SequenceFileInputFormat.class); + job.setMapOutputKeyClass(NullWritable.class); + job.setMapOutputValueClass(IntDoublePairWritable.class); + job.setOutputKeyClass(NullWritable.class); + job.setOutputValueClass(VectorWritable.class); + job.setOutputFormatClass(SequenceFileOutputFormat.class); + job.setMapperClass(MatrixDiagonalizeMapper.class); + job.setReducerClass(MatrixDiagonalizeReducer.class); + + FileInputFormat.addInputPath(job, affInput); + FileOutputFormat.setOutputPath(job, diagOutput); + + job.setJarByClass(MatrixDiagonalizeJob.class); + + boolean succeeded = job.waitForCompletion(true); + if (!succeeded) { + throw new IllegalStateException("Job failed!"); + } + + // read the results back from the path + return VectorCache.load(conf, new Path(diagOutput, "part-r-00000")); + } + + public static class MatrixDiagonalizeMapper + extends Mapper<IntWritable, VectorWritable, NullWritable, IntDoublePairWritable> { + + @Override + protected void map(IntWritable key, VectorWritable row, Context context) + throws IOException, InterruptedException { + // store the sum + IntDoublePairWritable store = new IntDoublePairWritable(key.get(), row.get().zSum()); + context.write(NullWritable.get(), store); + } + } + + public static class MatrixDiagonalizeReducer + extends Reducer<NullWritable, IntDoublePairWritable, NullWritable, VectorWritable> { + + @Override + protected void reduce(NullWritable key, Iterable<IntDoublePairWritable> values, + Context context) throws IOException, InterruptedException { + // create the return vector + Vector retval = new DenseVector(context.getConfiguration().getInt(Keys.AFFINITY_DIMENSIONS, Integer.MAX_VALUE)); + // put everything in its correct spot + for (IntDoublePairWritable e : values) { + retval.setQuick(e.getKey(), e.getValue()); + } + // write it out + context.write(key, new VectorWritable(retval)); + } + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/spectral/UnitVectorizerJob.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/clustering/spectral/UnitVectorizerJob.java b/mr/src/main/java/org/apache/mahout/clustering/spectral/UnitVectorizerJob.java new file mode 100644 index 0000000..56cb237 --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/clustering/spectral/UnitVectorizerJob.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.spectral; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.mahout.math.VectorWritable; + +/** + * <p>Given a DistributedRowMatrix, this job normalizes each row to unit + * vector length. If the input is a matrix U, and the output is a matrix + * W, the job follows:</p> + * + * <p>{@code v_ij = u_ij / sqrt(sum_j(u_ij * u_ij))}</p> + */ +public final class UnitVectorizerJob { + + private UnitVectorizerJob() { + } + + public static void runJob(Path input, Path output) + throws IOException, InterruptedException, ClassNotFoundException { + + Configuration conf = new Configuration(); + Job job = new Job(conf, "UnitVectorizerJob"); + + job.setInputFormatClass(SequenceFileInputFormat.class); + job.setOutputKeyClass(IntWritable.class); + job.setOutputValueClass(VectorWritable.class); + job.setOutputFormatClass(SequenceFileOutputFormat.class); + job.setMapperClass(UnitVectorizerMapper.class); + job.setNumReduceTasks(0); + + FileInputFormat.addInputPath(job, input); + FileOutputFormat.setOutputPath(job, output); + + job.setJarByClass(UnitVectorizerJob.class); + + boolean succeeded = job.waitForCompletion(true); + if (!succeeded) { + throw new IllegalStateException("Job failed!"); + } + } + + public static class UnitVectorizerMapper + extends Mapper<IntWritable, VectorWritable, IntWritable, VectorWritable> { + + @Override + protected void map(IntWritable row, VectorWritable vector, Context context) + throws IOException, InterruptedException { + context.write(row, new VectorWritable(vector.get().normalize(2))); + } + + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/spectral/VectorCache.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/clustering/spectral/VectorCache.java b/mr/src/main/java/org/apache/mahout/clustering/spectral/VectorCache.java new file mode 100644 index 0000000..60e0a2e --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/clustering/spectral/VectorCache.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.spectral; + +import java.io.IOException; +import java.net.URI; +import java.util.Arrays; + +import com.google.common.io.Closeables; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.filecache.DistributedCache; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; +import org.apache.mahout.common.HadoopUtil; +import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterator; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * This class handles reading and writing vectors to the Hadoop + * distributed cache. Created as a result of Eigencuts' liberal use + * of such functionality, but available to any algorithm requiring it. + */ +public final class VectorCache { + + private static final Logger log = LoggerFactory.getLogger(VectorCache.class); + + private VectorCache() { + } + + /** + * @param key SequenceFile key + * @param vector Vector to save, to be wrapped as VectorWritable + */ + public static void save(Writable key, + Vector vector, + Path output, + Configuration conf, + boolean overwritePath, + boolean deleteOnExit) throws IOException { + + FileSystem fs = FileSystem.get(output.toUri(), conf); + output = fs.makeQualified(output); + if (overwritePath) { + HadoopUtil.delete(conf, output); + } + + // set the cache + DistributedCache.setCacheFiles(new URI[]{output.toUri()}, conf); + + // set up the writer + SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, output, + IntWritable.class, VectorWritable.class); + try { + writer.append(key, new VectorWritable(vector)); + } finally { + Closeables.close(writer, false); + } + + if (deleteOnExit) { + fs.deleteOnExit(output); + } + } + + /** + * Calls the save() method, setting the cache to overwrite any previous + * Path and to delete the path after exiting + */ + public static void save(Writable key, Vector vector, Path output, Configuration conf) throws IOException { + save(key, vector, output, conf, true, true); + } + + /** + * Loads the vector from {@link DistributedCache}. Returns null if no vector exists. + */ + public static Vector load(Configuration conf) throws IOException { + Path[] files = HadoopUtil.getCachedFiles(conf); + + if (files.length != 1) { + throw new IOException("Cannot read Frequency list from Distributed Cache (" + files.length + ')'); + } + + if (log.isInfoEnabled()) { + log.info("Files are: {}", Arrays.toString(files)); + } + return load(conf, files[0]); + } + + /** + * Loads a Vector from the specified path. Returns null if no vector exists. + */ + public static Vector load(Configuration conf, Path input) throws IOException { + log.info("Loading vector from: {}", input); + SequenceFileValueIterator<VectorWritable> iterator = + new SequenceFileValueIterator<>(input, true, conf); + try { + return iterator.next().get(); + } finally { + Closeables.close(iterator, true); + } + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/spectral/VectorMatrixMultiplicationJob.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/clustering/spectral/VectorMatrixMultiplicationJob.java b/mr/src/main/java/org/apache/mahout/clustering/spectral/VectorMatrixMultiplicationJob.java new file mode 100644 index 0000000..c42ab70 --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/clustering/spectral/VectorMatrixMultiplicationJob.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.spectral; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.mahout.math.DenseVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; +import org.apache.mahout.math.function.Functions; +import org.apache.mahout.math.hadoop.DistributedRowMatrix; + +/** + * <p>This class handles the three-way multiplication of the digonal matrix + * and the Markov transition matrix inherent in the Eigencuts algorithm. + * The equation takes the form:</p> + * + * {@code W = D^(1/2) * M * D^(1/2)} + * + * <p>Since the diagonal matrix D has only n non-zero elements, it is represented + * as a dense vector in this job, rather than a full n-by-n matrix. This job + * performs the multiplications and returns the new DRM. + */ +public final class VectorMatrixMultiplicationJob { + + private VectorMatrixMultiplicationJob() { + } + + /** + * Invokes the job. + * @param markovPath Path to the markov DRM's sequence files + */ + public static DistributedRowMatrix runJob(Path markovPath, Vector diag, Path outputPath) + throws IOException, ClassNotFoundException, InterruptedException { + + return runJob(markovPath, diag, outputPath, new Path(outputPath, "tmp")); + } + + public static DistributedRowMatrix runJob(Path markovPath, Vector diag, Path outputPath, Path tmpPath) + throws IOException, ClassNotFoundException, InterruptedException { + + // set up the serialization of the diagonal vector + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.get(markovPath.toUri(), conf); + markovPath = fs.makeQualified(markovPath); + outputPath = fs.makeQualified(outputPath); + Path vectorOutputPath = new Path(outputPath.getParent(), "vector"); + VectorCache.save(new IntWritable(Keys.DIAGONAL_CACHE_INDEX), diag, vectorOutputPath, conf); + + // set up the job itself + Job job = new Job(conf, "VectorMatrixMultiplication"); + job.setInputFormatClass(SequenceFileInputFormat.class); + job.setOutputKeyClass(IntWritable.class); + job.setOutputValueClass(VectorWritable.class); + job.setOutputFormatClass(SequenceFileOutputFormat.class); + job.setMapperClass(VectorMatrixMultiplicationMapper.class); + job.setNumReduceTasks(0); + + FileInputFormat.addInputPath(job, markovPath); + FileOutputFormat.setOutputPath(job, outputPath); + + job.setJarByClass(VectorMatrixMultiplicationJob.class); + + boolean succeeded = job.waitForCompletion(true); + if (!succeeded) { + throw new IllegalStateException("Job failed!"); + } + + // build the resulting DRM from the results + return new DistributedRowMatrix(outputPath, tmpPath, + diag.size(), diag.size()); + } + + public static class VectorMatrixMultiplicationMapper + extends Mapper<IntWritable, VectorWritable, IntWritable, VectorWritable> { + + private Vector diagonal; + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + // read in the diagonal vector from the distributed cache + super.setup(context); + Configuration config = context.getConfiguration(); + diagonal = VectorCache.load(config); + if (diagonal == null) { + throw new IOException("No vector loaded from cache!"); + } + if (!(diagonal instanceof DenseVector)) { + diagonal = new DenseVector(diagonal); + } + } + + @Override + protected void map(IntWritable key, VectorWritable row, Context ctx) + throws IOException, InterruptedException { + + for (Vector.Element e : row.get().all()) { + double dii = Functions.SQRT.apply(diagonal.get(key.get())); + double djj = Functions.SQRT.apply(diagonal.get(e.index())); + double mij = e.get(); + e.set(dii * mij * djj); + } + ctx.write(key, row); + } + + /** + * Performs the setup of the Mapper. Used by unit tests. + * @param diag + */ + void setup(Vector diag) { + this.diagonal = diag; + } + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/spectral/VertexWritable.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/clustering/spectral/VertexWritable.java b/mr/src/main/java/org/apache/mahout/clustering/spectral/VertexWritable.java new file mode 100644 index 0000000..0d70cac --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/clustering/spectral/VertexWritable.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.spectral; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Writable; + +/** + * Represents a vertex within the affinity graph for Eigencuts. + */ +public class VertexWritable implements Writable { + + /** the row */ + private int i; + + /** the column */ + private int j; + + /** the value at this vertex */ + private double value; + + /** an extra type delimeter, can probably be null */ + private String type; + + public VertexWritable() { + } + + public VertexWritable(int i, int j, double v, String t) { + this.i = i; + this.j = j; + this.value = v; + this.type = t; + } + + public int getRow() { + return i; + } + + public void setRow(int i) { + this.i = i; + } + + public int getCol() { + return j; + } + + public void setCol(int j) { + this.j = j; + } + + public double getValue() { + return value; + } + + public void setValue(double v) { + this.value = v; + } + + public String getType() { + return type; + } + + public void setType(String t) { + this.type = t; + } + + @Override + public void readFields(DataInput arg0) throws IOException { + this.i = arg0.readInt(); + this.j = arg0.readInt(); + this.value = arg0.readDouble(); + this.type = arg0.readUTF(); + } + + @Override + public void write(DataOutput arg0) throws IOException { + arg0.writeInt(i); + arg0.writeInt(j); + arg0.writeDouble(value); + arg0.writeUTF(type); + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/spectral/kmeans/EigenSeedGenerator.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/clustering/spectral/kmeans/EigenSeedGenerator.java b/mr/src/main/java/org/apache/mahout/clustering/spectral/kmeans/EigenSeedGenerator.java new file mode 100644 index 0000000..5f9c1a6 --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/clustering/spectral/kmeans/EigenSeedGenerator.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.spectral.kmeans; + +import java.io.IOException; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.mahout.clustering.iterator.ClusterWritable; +import org.apache.mahout.clustering.kmeans.Kluster; +import org.apache.mahout.common.HadoopUtil; +import org.apache.mahout.common.Pair; +import org.apache.mahout.common.distance.DistanceMeasure; +import org.apache.mahout.common.iterator.sequencefile.PathFilters; +import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Maps; +import com.google.common.io.Closeables; + +/** + * Given an Input Path containing a {@link org.apache.hadoop.io.SequenceFile}, select k vectors and write them to the + * output file as a {@link org.apache.mahout.clustering.kmeans.Kluster} representing the initial centroid to use. The + * selection criterion is the rows with max value in that respective column + */ +public final class EigenSeedGenerator { + + private static final Logger log = LoggerFactory.getLogger(EigenSeedGenerator.class); + + public static final String K = "k"; + + private EigenSeedGenerator() {} + + public static Path buildFromEigens(Configuration conf, Path input, Path output, int k, DistanceMeasure measure) + throws IOException { + // delete the output directory + FileSystem fs = FileSystem.get(output.toUri(), conf); + HadoopUtil.delete(conf, output); + Path outFile = new Path(output, "part-eigenSeed"); + boolean newFile = fs.createNewFile(outFile); + if (newFile) { + Path inputPathPattern; + + if (fs.getFileStatus(input).isDir()) { + inputPathPattern = new Path(input, "*"); + } else { + inputPathPattern = input; + } + + FileStatus[] inputFiles = fs.globStatus(inputPathPattern, PathFilters.logsCRCFilter()); + SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, outFile, Text.class, ClusterWritable.class); + Map<Integer,Double> maxEigens = Maps.newHashMapWithExpectedSize(k); // store + // max + // value + // of + // each + // column + Map<Integer,Text> chosenTexts = Maps.newHashMapWithExpectedSize(k); + Map<Integer,ClusterWritable> chosenClusters = Maps.newHashMapWithExpectedSize(k); + + for (FileStatus fileStatus : inputFiles) { + if (!fileStatus.isDir()) { + for (Pair<Writable,VectorWritable> record : new SequenceFileIterable<Writable,VectorWritable>( + fileStatus.getPath(), true, conf)) { + Writable key = record.getFirst(); + VectorWritable value = record.getSecond(); + + for (Vector.Element e : value.get().nonZeroes()) { + int index = e.index(); + double v = Math.abs(e.get()); + + if (!maxEigens.containsKey(index) || v > maxEigens.get(index)) { + maxEigens.put(index, v); + Text newText = new Text(key.toString()); + chosenTexts.put(index, newText); + Kluster newCluster = new Kluster(value.get(), index, measure); + newCluster.observe(value.get(), 1); + ClusterWritable clusterWritable = new ClusterWritable(); + clusterWritable.setValue(newCluster); + chosenClusters.put(index, clusterWritable); + } + } + } + } + } + + try { + for (Integer key : maxEigens.keySet()) { + writer.append(chosenTexts.get(key), chosenClusters.get(key)); + } + log.info("EigenSeedGenerator:: Wrote {} Klusters to {}", chosenTexts.size(), outFile); + } finally { + Closeables.close(writer, false); + } + } + + return outFile; + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java ---------------------------------------------------------------------- diff --git a/mr/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java b/mr/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java new file mode 100644 index 0000000..427de91 --- /dev/null +++ b/mr/src/main/java/org/apache/mahout/clustering/spectral/kmeans/SpectralKMeansDriver.java @@ -0,0 +1,243 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.spectral.kmeans; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.util.ToolRunner; +import org.apache.mahout.clustering.Cluster; +import org.apache.mahout.clustering.classify.WeightedVectorWritable; +import org.apache.mahout.clustering.kmeans.KMeansDriver; +import org.apache.mahout.clustering.spectral.AffinityMatrixInputJob; +import org.apache.mahout.clustering.spectral.MatrixDiagonalizeJob; +import org.apache.mahout.clustering.spectral.UnitVectorizerJob; +import org.apache.mahout.clustering.spectral.VectorMatrixMultiplicationJob; +import org.apache.mahout.common.AbstractJob; +import org.apache.mahout.common.ClassUtils; +import org.apache.mahout.common.HadoopUtil; +import org.apache.mahout.common.Pair; +import org.apache.mahout.common.commandline.DefaultOptionCreator; +import org.apache.mahout.common.distance.DistanceMeasure; +import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.hadoop.DistributedRowMatrix; +import org.apache.mahout.math.hadoop.stochasticsvd.SSVDSolver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Performs spectral k-means clustering on the top k eigenvectors of the input affinity matrix. + */ +public class SpectralKMeansDriver extends AbstractJob { + private static final Logger log = LoggerFactory.getLogger(SpectralKMeansDriver.class); + + public static final int REDUCERS = 10; + public static final int BLOCKHEIGHT = 30000; + public static final int OVERSAMPLING = 15; + public static final int POWERITERS = 0; + + public static void main(String[] args) throws Exception { + ToolRunner.run(new SpectralKMeansDriver(), args); + } + + @Override + public int run(String[] arg0) throws Exception { + + Configuration conf = getConf(); + addInputOption(); + addOutputOption(); + addOption("dimensions", "d", "Square dimensions of affinity matrix", true); + addOption("clusters", "k", "Number of clusters and top eigenvectors", true); + addOption(DefaultOptionCreator.distanceMeasureOption().create()); + addOption(DefaultOptionCreator.convergenceOption().create()); + addOption(DefaultOptionCreator.maxIterationsOption().create()); + addOption(DefaultOptionCreator.overwriteOption().create()); + addFlag("usessvd", "ssvd", "Uses SSVD as the eigensolver. Default is the Lanczos solver."); + addOption("reduceTasks", "t", "Number of reducers for SSVD", String.valueOf(REDUCERS)); + addOption("outerProdBlockHeight", "oh", "Block height of outer products for SSVD", String.valueOf(BLOCKHEIGHT)); + addOption("oversampling", "p", "Oversampling parameter for SSVD", String.valueOf(OVERSAMPLING)); + addOption("powerIter", "q", "Additional power iterations for SSVD", String.valueOf(POWERITERS)); + + Map<String, List<String>> parsedArgs = parseArguments(arg0); + if (parsedArgs == null) { + return 0; + } + + Path input = getInputPath(); + Path output = getOutputPath(); + if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) { + HadoopUtil.delete(conf, getTempPath()); + HadoopUtil.delete(conf, getOutputPath()); + } + int numDims = Integer.parseInt(getOption("dimensions")); + int clusters = Integer.parseInt(getOption("clusters")); + String measureClass = getOption(DefaultOptionCreator.DISTANCE_MEASURE_OPTION); + DistanceMeasure measure = ClassUtils.instantiateAs(measureClass, DistanceMeasure.class); + double convergenceDelta = Double.parseDouble(getOption(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION)); + int maxIterations = Integer.parseInt(getOption(DefaultOptionCreator.MAX_ITERATIONS_OPTION)); + + Path tempdir = new Path(getOption("tempDir")); + int reducers = Integer.parseInt(getOption("reduceTasks")); + int blockheight = Integer.parseInt(getOption("outerProdBlockHeight")); + int oversampling = Integer.parseInt(getOption("oversampling")); + int poweriters = Integer.parseInt(getOption("powerIter")); + run(conf, input, output, numDims, clusters, measure, convergenceDelta, maxIterations, tempdir, reducers, + blockheight, oversampling, poweriters); + + return 0; + } + + public static void run(Configuration conf, Path input, Path output, int numDims, int clusters, + DistanceMeasure measure, double convergenceDelta, int maxIterations, Path tempDir) + throws IOException, InterruptedException, ClassNotFoundException { + run(conf, input, output, numDims, clusters, measure, convergenceDelta, maxIterations, tempDir, REDUCERS, + BLOCKHEIGHT, OVERSAMPLING, POWERITERS); + } + + /** + * Run the Spectral KMeans clustering on the supplied arguments + * + * @param conf + * the Configuration to be used + * @param input + * the Path to the input tuples directory + * @param output + * the Path to the output directory + * @param numDims + * the int number of dimensions of the affinity matrix + * @param clusters + * the int number of eigenvectors and thus clusters to produce + * @param measure + * the DistanceMeasure for the k-Means calculations + * @param convergenceDelta + * the double convergence delta for the k-Means calculations + * @param maxIterations + * the int maximum number of iterations for the k-Means calculations + * @param tempDir + * Temporary directory for intermediate calculations + * @param numReducers + * Number of reducers + * @param blockHeight + * @param oversampling + * @param poweriters + */ + public static void run(Configuration conf, Path input, Path output, int numDims, int clusters, + DistanceMeasure measure, double convergenceDelta, int maxIterations, Path tempDir, + int numReducers, int blockHeight, int oversampling, int poweriters) + throws IOException, InterruptedException, ClassNotFoundException { + + HadoopUtil.delete(conf, tempDir); + Path outputCalc = new Path(tempDir, "calculations"); + Path outputTmp = new Path(tempDir, "temporary"); + + // Take in the raw CSV text file and split it ourselves, + // creating our own SequenceFiles for the matrices to read later + // (similar to the style of syntheticcontrol.canopy.InputMapper) + Path affSeqFiles = new Path(outputCalc, "seqfile"); + AffinityMatrixInputJob.runJob(input, affSeqFiles, numDims, numDims); + + // Construct the affinity matrix using the newly-created sequence files + DistributedRowMatrix A = new DistributedRowMatrix(affSeqFiles, new Path(outputTmp, "afftmp"), numDims, numDims); + + Configuration depConf = new Configuration(conf); + A.setConf(depConf); + + // Construct the diagonal matrix D (represented as a vector) + Vector D = MatrixDiagonalizeJob.runJob(affSeqFiles, numDims); + + // Calculate the normalized Laplacian of the form: L = D^(-0.5)AD^(-0.5) + DistributedRowMatrix L = VectorMatrixMultiplicationJob.runJob(affSeqFiles, D, new Path(outputCalc, "laplacian"), + new Path(outputCalc, outputCalc)); + L.setConf(depConf); + + Path data; + + // SSVD requires an array of Paths to function. So we pass in an array of length one + Path[] LPath = new Path[1]; + LPath[0] = L.getRowPath(); + + Path SSVDout = new Path(outputCalc, "SSVD"); + + SSVDSolver solveIt = new SSVDSolver(depConf, LPath, SSVDout, blockHeight, clusters, oversampling, numReducers); + + solveIt.setComputeV(false); + solveIt.setComputeU(true); + solveIt.setOverwrite(true); + solveIt.setQ(poweriters); + // solveIt.setBroadcast(false); + solveIt.run(); + data = new Path(solveIt.getUPath()); + + // Normalize the rows of Wt to unit length + // normalize is important because it reduces the occurrence of two unique clusters combining into one + Path unitVectors = new Path(outputCalc, "unitvectors"); + + UnitVectorizerJob.runJob(data, unitVectors); + + DistributedRowMatrix Wt = new DistributedRowMatrix(unitVectors, new Path(unitVectors, "tmp"), clusters, numDims); + Wt.setConf(depConf); + data = Wt.getRowPath(); + + // Generate initial clusters using EigenSeedGenerator which picks rows as centroids if that row contains max + // eigen value in that column + Path initialclusters = EigenSeedGenerator.buildFromEigens(conf, data, + new Path(output, Cluster.INITIAL_CLUSTERS_DIR), clusters, measure); + + // Run the KMeansDriver + Path answer = new Path(output, "kmeans_out"); + KMeansDriver.run(conf, data, initialclusters, answer, convergenceDelta, maxIterations, true, 0.0, false); + + // Restore name to id mapping and read through the cluster assignments + Path mappingPath = new Path(new Path(conf.get("hadoop.tmp.dir")), "generic_input_mapping"); + List<String> mapping = new ArrayList<>(); + FileSystem fs = FileSystem.get(mappingPath.toUri(), conf); + if (fs.exists(mappingPath)) { + SequenceFile.Reader reader = new SequenceFile.Reader(fs, mappingPath, conf); + Text mappingValue = new Text(); + IntWritable mappingIndex = new IntWritable(); + while (reader.next(mappingIndex, mappingValue)) { + String s = mappingValue.toString(); + mapping.add(s); + } + HadoopUtil.delete(conf, mappingPath); + } else { + log.warn("generic input mapping file not found!"); + } + + Path clusteredPointsPath = new Path(answer, "clusteredPoints"); + Path inputPath = new Path(clusteredPointsPath, "part-m-00000"); + int id = 0; + for (Pair<IntWritable, WeightedVectorWritable> record : + new SequenceFileIterable<IntWritable, WeightedVectorWritable>(inputPath, conf)) { + if (!mapping.isEmpty()) { + log.info("{}: {}", mapping.get(id++), record.getFirst().get()); + } else { + log.info("{}: {}", id++, record.getFirst().get()); + } + } + } +}
