http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/clustering/ClusteringUtils.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/clustering/ClusteringUtils.java b/community/mahout-mr/src/main/java/org/apache/mahout/clustering/ClusteringUtils.java new file mode 100644 index 0000000..ad0f8ec --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/clustering/ClusteringUtils.java @@ -0,0 +1,306 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering; + +import java.util.ArrayList; +import java.util.List; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import org.apache.mahout.common.distance.DistanceMeasure; +import org.apache.mahout.common.distance.EuclideanDistanceMeasure; +import org.apache.mahout.math.Centroid; +import org.apache.mahout.math.DenseMatrix; +import org.apache.mahout.math.Matrix; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.WeightedVector; +import org.apache.mahout.math.neighborhood.BruteSearch; +import org.apache.mahout.math.neighborhood.ProjectionSearch; +import org.apache.mahout.math.neighborhood.Searcher; +import org.apache.mahout.math.neighborhood.UpdatableSearcher; +import org.apache.mahout.math.random.WeightedThing; +import org.apache.mahout.math.stats.OnlineSummarizer; + +public final class ClusteringUtils { + private ClusteringUtils() { + } + + /** + * Computes the summaries for the distances in each cluster. + * @param datapoints iterable of datapoints. + * @param centroids iterable of Centroids. + * @return a list of OnlineSummarizers where the i-th element is the summarizer corresponding to the cluster whose + * index is i. + */ + public static List<OnlineSummarizer> summarizeClusterDistances(Iterable<? extends Vector> datapoints, + Iterable<? extends Vector> centroids, + DistanceMeasure distanceMeasure) { + UpdatableSearcher searcher = new ProjectionSearch(distanceMeasure, 3, 1); + searcher.addAll(centroids); + List<OnlineSummarizer> summarizers = new ArrayList<>(); + if (searcher.size() == 0) { + return summarizers; + } + for (int i = 0; i < searcher.size(); ++i) { + summarizers.add(new OnlineSummarizer()); + } + for (Vector v : datapoints) { + Centroid closest = (Centroid)searcher.search(v, 1).get(0).getValue(); + OnlineSummarizer summarizer = summarizers.get(closest.getIndex()); + summarizer.add(distanceMeasure.distance(v, closest)); + } + return summarizers; + } + + /** + * Adds up the distances from each point to its closest cluster and returns the sum. + * @param datapoints iterable of datapoints. + * @param centroids iterable of Centroids. + * @return the total cost described above. + */ + public static double totalClusterCost(Iterable<? extends Vector> datapoints, Iterable<? extends Vector> centroids) { + DistanceMeasure distanceMeasure = new EuclideanDistanceMeasure(); + UpdatableSearcher searcher = new ProjectionSearch(distanceMeasure, 3, 1); + searcher.addAll(centroids); + return totalClusterCost(datapoints, searcher); + } + + /** + * Adds up the distances from each point to its closest cluster and returns the sum. + * @param datapoints iterable of datapoints. + * @param centroids searcher of Centroids. + * @return the total cost described above. + */ + public static double totalClusterCost(Iterable<? extends Vector> datapoints, Searcher centroids) { + double totalCost = 0; + for (Vector vector : datapoints) { + totalCost += centroids.searchFirst(vector, false).getWeight(); + } + return totalCost; + } + + /** + * Estimates the distance cutoff. In StreamingKMeans, the distance between two vectors divided + * by this value is used as a probability threshold when deciding whether to form a new cluster + * or not. + * Small values (comparable to the minimum distance between two points) are preferred as they + * guarantee with high likelihood that all but very close points are put in separate clusters + * initially. The clusters themselves are actually collapsed periodically when their number goes + * over the maximum number of clusters and the distanceCutoff is increased. + * So, the returned value is only an initial estimate. + * @param data the datapoints whose distance is to be estimated. + * @param distanceMeasure the distance measure used to compute the distance between two points. + * @return the minimum distance between the first sampleLimit points + * @see org.apache.mahout.clustering.streaming.cluster.StreamingKMeans#clusterInternal(Iterable, boolean) + */ + public static double estimateDistanceCutoff(List<? extends Vector> data, DistanceMeasure distanceMeasure) { + BruteSearch searcher = new BruteSearch(distanceMeasure); + searcher.addAll(data); + double minDistance = Double.POSITIVE_INFINITY; + for (Vector vector : data) { + double closest = searcher.searchFirst(vector, true).getWeight(); + if (minDistance > 0 && closest < minDistance) { + minDistance = closest; + } + searcher.add(vector); + } + return minDistance; + } + + public static <T extends Vector> double estimateDistanceCutoff( + Iterable<T> data, DistanceMeasure distanceMeasure, int sampleLimit) { + return estimateDistanceCutoff(Lists.newArrayList(Iterables.limit(data, sampleLimit)), distanceMeasure); + } + + /** + * Computes the Davies-Bouldin Index for a given clustering. + * See http://en.wikipedia.org/wiki/Clustering_algorithm#Internal_evaluation + * @param centroids list of centroids + * @param distanceMeasure distance measure for inter-cluster distances + * @param clusterDistanceSummaries summaries of the clusters; See summarizeClusterDistances + * @return the Davies-Bouldin Index + */ + public static double daviesBouldinIndex(List<? extends Vector> centroids, DistanceMeasure distanceMeasure, + List<OnlineSummarizer> clusterDistanceSummaries) { + Preconditions.checkArgument(centroids.size() == clusterDistanceSummaries.size(), + "Number of centroids and cluster summaries differ."); + int n = centroids.size(); + double totalDBIndex = 0; + // The inner loop shouldn't be reduced for j = i + 1 to n because the computation of the Davies-Bouldin + // index is not really symmetric. + // For a given cluster i, we look for a cluster j that maximizes the ratio of the sum of average distances + // from points in cluster i to its center and and points in cluster j to its center to the distance between + // cluster i and cluster j. + // The maximization is the key issue, as the cluster that maximizes this ratio might be j for i but is NOT + // NECESSARILY i for j. + for (int i = 0; i < n; ++i) { + double averageDistanceI = clusterDistanceSummaries.get(i).getMean(); + double maxDBIndex = 0; + for (int j = 0; j < n; ++j) { + if (i != j) { + double dbIndex = (averageDistanceI + clusterDistanceSummaries.get(j).getMean()) + / distanceMeasure.distance(centroids.get(i), centroids.get(j)); + if (dbIndex > maxDBIndex) { + maxDBIndex = dbIndex; + } + } + } + totalDBIndex += maxDBIndex; + } + return totalDBIndex / n; + } + + /** + * Computes the Dunn Index of a given clustering. See http://en.wikipedia.org/wiki/Dunn_index + * @param centroids list of centroids + * @param distanceMeasure distance measure to compute inter-centroid distance with + * @param clusterDistanceSummaries summaries of the clusters; See summarizeClusterDistances + * @return the Dunn Index + */ + public static double dunnIndex(List<? extends Vector> centroids, DistanceMeasure distanceMeasure, + List<OnlineSummarizer> clusterDistanceSummaries) { + Preconditions.checkArgument(centroids.size() == clusterDistanceSummaries.size(), + "Number of centroids and cluster summaries differ."); + int n = centroids.size(); + // Intra-cluster distances will come from the OnlineSummarizer, and will be the median distance (noting that + // the median for just one value is that value). + // A variety of metrics can be used for the intra-cluster distance including max distance between two points, + // mean distance, etc. Median distance was chosen as this is more robust to outliers and characterizes the + // distribution of distances (from a point to the center) better. + double maxIntraClusterDistance = 0; + for (OnlineSummarizer summarizer : clusterDistanceSummaries) { + if (summarizer.getCount() > 0) { + double intraClusterDistance; + if (summarizer.getCount() == 1) { + intraClusterDistance = summarizer.getMean(); + } else { + intraClusterDistance = summarizer.getMedian(); + } + if (maxIntraClusterDistance < intraClusterDistance) { + maxIntraClusterDistance = intraClusterDistance; + } + } + } + double minDunnIndex = Double.POSITIVE_INFINITY; + for (int i = 0; i < n; ++i) { + // Distances are symmetric, so d(i, j) = d(j, i). + for (int j = i + 1; j < n; ++j) { + double dunnIndex = distanceMeasure.distance(centroids.get(i), centroids.get(j)); + if (minDunnIndex > dunnIndex) { + minDunnIndex = dunnIndex; + } + } + } + return minDunnIndex / maxIntraClusterDistance; + } + + public static double choose2(double n) { + return n * (n - 1) / 2; + } + + /** + * Creates a confusion matrix by searching for the closest cluster of both the row clustering and column clustering + * of a point and adding its weight to that cell of the matrix. + * It doesn't matter which clustering is the row clustering and which is the column clustering. If they're + * interchanged, the resulting matrix is the transpose of the original one. + * @param rowCentroids clustering one + * @param columnCentroids clustering two + * @param datapoints datapoints whose closest cluster we need to find + * @param distanceMeasure distance measure to use + * @return the confusion matrix + */ + public static Matrix getConfusionMatrix(List<? extends Vector> rowCentroids, List<? extends Vector> columnCentroids, + Iterable<? extends Vector> datapoints, DistanceMeasure distanceMeasure) { + Searcher rowSearcher = new BruteSearch(distanceMeasure); + rowSearcher.addAll(rowCentroids); + Searcher columnSearcher = new BruteSearch(distanceMeasure); + columnSearcher.addAll(columnCentroids); + + int numRows = rowCentroids.size(); + int numCols = columnCentroids.size(); + Matrix confusionMatrix = new DenseMatrix(numRows, numCols); + + for (Vector vector : datapoints) { + WeightedThing<Vector> closestRowCentroid = rowSearcher.search(vector, 1).get(0); + WeightedThing<Vector> closestColumnCentroid = columnSearcher.search(vector, 1).get(0); + int row = ((Centroid) closestRowCentroid.getValue()).getIndex(); + int column = ((Centroid) closestColumnCentroid.getValue()).getIndex(); + double vectorWeight; + if (vector instanceof WeightedVector) { + vectorWeight = ((WeightedVector) vector).getWeight(); + } else { + vectorWeight = 1; + } + confusionMatrix.set(row, column, confusionMatrix.get(row, column) + vectorWeight); + } + + return confusionMatrix; + } + + /** + * Computes the Adjusted Rand Index for a given confusion matrix. + * @param confusionMatrix confusion matrix; not to be confused with the more restrictive ConfusionMatrix class + * @return the Adjusted Rand Index + */ + public static double getAdjustedRandIndex(Matrix confusionMatrix) { + int numRows = confusionMatrix.numRows(); + int numCols = confusionMatrix.numCols(); + double rowChoiceSum = 0; + double columnChoiceSum = 0; + double totalChoiceSum = 0; + double total = 0; + for (int i = 0; i < numRows; ++i) { + double rowSum = 0; + for (int j = 0; j < numCols; ++j) { + rowSum += confusionMatrix.get(i, j); + totalChoiceSum += choose2(confusionMatrix.get(i, j)); + } + total += rowSum; + rowChoiceSum += choose2(rowSum); + } + for (int j = 0; j < numCols; ++j) { + double columnSum = 0; + for (int i = 0; i < numRows; ++i) { + columnSum += confusionMatrix.get(i, j); + } + columnChoiceSum += choose2(columnSum); + } + double rowColumnChoiceSumDivTotal = rowChoiceSum * columnChoiceSum / choose2(total); + return (totalChoiceSum - rowColumnChoiceSumDivTotal) + / ((rowChoiceSum + columnChoiceSum) / 2 - rowColumnChoiceSumDivTotal); + } + + /** + * Computes the total weight of the points in the given Vector iterable. + * @param data iterable of points + * @return total weight + */ + public static double totalWeight(Iterable<? extends Vector> data) { + double sum = 0; + for (Vector row : data) { + Preconditions.checkNotNull(row); + if (row instanceof WeightedVector) { + sum += ((WeightedVector)row).getWeight(); + } else { + sum++; + } + } + return sum; + } +}
http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/clustering/GaussianAccumulator.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/clustering/GaussianAccumulator.java b/community/mahout-mr/src/main/java/org/apache/mahout/clustering/GaussianAccumulator.java new file mode 100644 index 0000000..c25e039 --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/clustering/GaussianAccumulator.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering; + +import org.apache.mahout.math.Vector; + +public interface GaussianAccumulator { + + /** + * @return the number of observations + */ + double getN(); + + /** + * @return the mean of the observations + */ + Vector getMean(); + + /** + * @return the std of the observations + */ + Vector getStd(); + + /** + * @return the average of the vector std elements + */ + double getAverageStd(); + + /** + * @return the variance of the observations + */ + Vector getVariance(); + + /** + * Observe the vector + * + * @param x a Vector + * @param weight the double observation weight (usually 1.0) + */ + void observe(Vector x, double weight); + + /** + * Compute the mean, variance and standard deviation + */ + void compute(); + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/clustering/Model.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/clustering/Model.java b/community/mahout-mr/src/main/java/org/apache/mahout/clustering/Model.java new file mode 100644 index 0000000..79dab30 --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/clustering/Model.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering; + +import org.apache.hadoop.io.Writable; +import org.apache.mahout.math.VectorWritable; + +/** + * A model is a probability distribution over observed data points and allows + * the probability of any data point to be computed. All Models have a + * persistent representation and extend + * WritablesampleFromPosterior(Model<VectorWritable>[]) + */ +public interface Model<O> extends Writable { + + /** + * Return the probability that the observation is described by this model + * + * @param x + * an Observation from the posterior + * @return the probability that x is in the receiver + */ + double pdf(O x); + + /** + * Observe the given observation, retaining information about it + * + * @param x + * an Observation from the posterior + */ + void observe(O x); + + /** + * Observe the given observation, retaining information about it + * + * @param x + * an Observation from the posterior + * @param weight + * a double weighting factor + */ + void observe(O x, double weight); + + /** + * Observe the given model, retaining information about its observations + * + * @param x + * a Model<0> + */ + void observe(Model<O> x); + + /** + * Compute a new set of posterior parameters based upon the Observations that + * have been observed since my creation + */ + void computeParameters(); + + /** + * Return the number of observations that this model has seen since its + * parameters were last computed + * + * @return a long + */ + long getNumObservations(); + + /** + * Return the number of observations that this model has seen over its + * lifetime + * + * @return a long + */ + long getTotalObservations(); + + /** + * @return a sample of my posterior model + */ + Model<VectorWritable> sampleFromPosterior(); + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/clustering/ModelDistribution.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/clustering/ModelDistribution.java b/community/mahout-mr/src/main/java/org/apache/mahout/clustering/ModelDistribution.java new file mode 100644 index 0000000..d77bf40 --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/clustering/ModelDistribution.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering; + +/** A model distribution allows us to sample a model from its prior distribution. */ +public interface ModelDistribution<O> { + + /** + * Return a list of models sampled from the prior + * + * @param howMany + * the int number of models to return + * @return a Model<Observation>[] representing what is known apriori + */ + Model<O>[] sampleFromPrior(int howMany); + + /** + * Return a list of models sampled from the posterior + * + * @param posterior + * the Model<Observation>[] after observations + * @return a Model<Observation>[] representing what is known apriori + */ + Model<O>[] sampleFromPosterior(Model<O>[] posterior); + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/clustering/OnlineGaussianAccumulator.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/clustering/OnlineGaussianAccumulator.java b/community/mahout-mr/src/main/java/org/apache/mahout/clustering/OnlineGaussianAccumulator.java new file mode 100644 index 0000000..b76e00f --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/clustering/OnlineGaussianAccumulator.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.mahout.clustering; + +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.function.SquareRootFunction; + +/** + * An online Gaussian statistics accumulator based upon Knuth (who cites Welford) which is declared to be + * numerically-stable. See http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance + */ +public class OnlineGaussianAccumulator implements GaussianAccumulator { + + private double sumWeight; + private Vector mean; + private Vector s; + private Vector variance; + + @Override + public double getN() { + return sumWeight; + } + + @Override + public Vector getMean() { + return mean; + } + + @Override + public Vector getStd() { + return variance.clone().assign(new SquareRootFunction()); + } + + /* from Wikipedia: http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance + * + * Weighted incremental algorithm + * + * def weighted_incremental_variance(dataWeightPairs): + * mean = 0 + * S = 0 + * sumweight = 0 + * for x, weight in dataWeightPairs: # Alternately "for x in zip(data, weight):" + * temp = weight + sumweight + * Q = x - mean + * R = Q * weight / temp + * S = S + sumweight * Q * R + * mean = mean + R + * sumweight = temp + * Variance = S / (sumweight-1) # if sample is the population, omit -1 + * return Variance + */ + @Override + public void observe(Vector x, double weight) { + double temp = weight + sumWeight; + Vector q; + if (mean == null) { + mean = x.like(); + q = x.clone(); + } else { + q = x.minus(mean); + } + Vector r = q.times(weight).divide(temp); + if (s == null) { + s = q.times(sumWeight).times(r); + } else { + s = s.plus(q.times(sumWeight).times(r)); + } + mean = mean.plus(r); + sumWeight = temp; + variance = s.divide(sumWeight - 1); // # if sample is the population, omit -1 + } + + @Override + public void compute() { + // nothing to do here! + } + + @Override + public double getAverageStd() { + if (sumWeight == 0.0) { + return 0.0; + } else { + Vector std = getStd(); + return std.zSum() / std.size(); + } + } + + @Override + public Vector getVariance() { + return variance; + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/clustering/RunningSumsGaussianAccumulator.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/clustering/RunningSumsGaussianAccumulator.java b/community/mahout-mr/src/main/java/org/apache/mahout/clustering/RunningSumsGaussianAccumulator.java new file mode 100644 index 0000000..138e830 --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/clustering/RunningSumsGaussianAccumulator.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.mahout.clustering; + +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.function.Functions; +import org.apache.mahout.math.function.SquareRootFunction; + +/** + * An online Gaussian accumulator that uses a running power sums approach as reported + * on http://en.wikipedia.org/wiki/Standard_deviation + * Suffers from overflow, underflow and roundoff error but has minimal observe-time overhead + */ +public class RunningSumsGaussianAccumulator implements GaussianAccumulator { + + private double s0; + private Vector s1; + private Vector s2; + private Vector mean; + private Vector std; + + @Override + public double getN() { + return s0; + } + + @Override + public Vector getMean() { + return mean; + } + + @Override + public Vector getStd() { + return std; + } + + @Override + public double getAverageStd() { + if (s0 == 0.0) { + return 0.0; + } else { + return std.zSum() / std.size(); + } + } + + @Override + public Vector getVariance() { + return std.times(std); + } + + @Override + public void observe(Vector x, double weight) { + s0 += weight; + Vector weightedX = x.times(weight); + if (s1 == null) { + s1 = weightedX; + } else { + s1.assign(weightedX, Functions.PLUS); + } + Vector x2 = x.times(x).times(weight); + if (s2 == null) { + s2 = x2; + } else { + s2.assign(x2, Functions.PLUS); + } + } + + @Override + public void compute() { + if (s0 != 0.0) { + mean = s1.divide(s0); + std = s2.times(s0).minus(s1.times(s1)).assign(new SquareRootFunction()).divide(s0); + } + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/clustering/UncommonDistributions.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/clustering/UncommonDistributions.java b/community/mahout-mr/src/main/java/org/apache/mahout/clustering/UncommonDistributions.java new file mode 100644 index 0000000..ef43e1b --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/clustering/UncommonDistributions.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering; + +import org.apache.commons.math3.distribution.NormalDistribution; +import org.apache.commons.math3.distribution.RealDistribution; +import org.apache.mahout.common.RandomUtils; +import org.apache.mahout.common.RandomWrapper; + +public final class UncommonDistributions { + + private static final RandomWrapper RANDOM = RandomUtils.getRandom(); + + private UncommonDistributions() {} + + // =============== start of BSD licensed code. See LICENSE.txt + /** + * Returns a double sampled according to this distribution. Uniformly fast for all k > 0. (Reference: + * Non-Uniform Random Variate Generation, Devroye http://cgm.cs.mcgill.ca/~luc/rnbookindex.html) Uses + * Cheng's rejection algorithm (GB) for k>=1, rejection from Weibull distribution for 0 < k < 1. + */ + public static double rGamma(double k, double lambda) { + boolean accept = false; + if (k >= 1.0) { + // Cheng's algorithm + double b = k - Math.log(4.0); + double c = k + Math.sqrt(2.0 * k - 1.0); + double lam = Math.sqrt(2.0 * k - 1.0); + double cheng = 1.0 + Math.log(4.5); + double x; + do { + double u = RANDOM.nextDouble(); + double v = RANDOM.nextDouble(); + double y = 1.0 / lam * Math.log(v / (1.0 - v)); + x = k * Math.exp(y); + double z = u * v * v; + double r = b + c * y - x; + if (r >= 4.5 * z - cheng || r >= Math.log(z)) { + accept = true; + } + } while (!accept); + return x / lambda; + } else { + // Weibull algorithm + double c = 1.0 / k; + double d = (1.0 - k) * Math.pow(k, k / (1.0 - k)); + double x; + do { + double u = RANDOM.nextDouble(); + double v = RANDOM.nextDouble(); + double z = -Math.log(u); + double e = -Math.log(v); + x = Math.pow(z, c); + if (z + e >= d + x) { + accept = true; + } + } while (!accept); + return x / lambda; + } + } + + // ============= end of BSD licensed code + + /** + * Returns a random sample from a beta distribution with the given shapes + * + * @param shape1 + * a double representing shape1 + * @param shape2 + * a double representing shape2 + * @return a Vector of samples + */ + public static double rBeta(double shape1, double shape2) { + double gam1 = rGamma(shape1, 1.0); + double gam2 = rGamma(shape2, 1.0); + return gam1 / (gam1 + gam2); + + } + + /** + * Return a random value from a normal distribution with the given mean and standard deviation + * + * @param mean + * a double mean value + * @param sd + * a double standard deviation + * @return a double sample + */ + public static double rNorm(double mean, double sd) { + RealDistribution dist = new NormalDistribution(RANDOM.getRandomGenerator(), + mean, + sd, + NormalDistribution.DEFAULT_INVERSE_ABSOLUTE_ACCURACY); + return dist.sample(); + } + + /** + * Returns an integer sampled according to this distribution. Takes time proportional to np + 1. (Reference: + * Non-Uniform Random Variate Generation, Devroye http://cgm.cs.mcgill.ca/~luc/rnbookindex.html) Second + * time-waiting algorithm. + */ + public static int rBinomial(int n, double p) { + if (p >= 1.0) { + return n; // needed to avoid infinite loops and negative results + } + double q = -Math.log1p(-p); + double sum = 0.0; + int x = 0; + while (sum <= q) { + double u = RANDOM.nextDouble(); + double e = -Math.log(u); + sum += e / (n - x); + x++; + } + if (x == 0) { + return 0; + } + return x - 1; + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/clustering/canopy/Canopy.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/clustering/canopy/Canopy.java b/community/mahout-mr/src/main/java/org/apache/mahout/clustering/canopy/Canopy.java new file mode 100644 index 0000000..930fd44 --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/clustering/canopy/Canopy.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.canopy; + +import org.apache.mahout.clustering.iterator.DistanceMeasureCluster; +import org.apache.mahout.common.distance.DistanceMeasure; +import org.apache.mahout.math.Vector; + +/** + * This class models a canopy as a center point, the number of points that are contained within it according + * to the application of some distance metric, and a point total which is the sum of all the points and is + * used to compute the centroid when needed. + */ +@Deprecated +public class Canopy extends DistanceMeasureCluster { + + /** Used for deserialization as a writable */ + public Canopy() { } + + /** + * Create a new Canopy containing the given point and canopyId + * + * @param center a point in vector space + * @param canopyId an int identifying the canopy local to this process only + * @param measure a DistanceMeasure to use + */ + public Canopy(Vector center, int canopyId, DistanceMeasure measure) { + super(center, canopyId, measure); + observe(center); + } + + public String asFormatString() { + return "C" + this.getId() + ": " + this.computeCentroid().asFormatString(); + } + + @Override + public String toString() { + return getIdentifier() + ": " + getCenter().asFormatString(); + } + + @Override + public String getIdentifier() { + return "C-" + getId(); + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusterer.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusterer.java b/community/mahout-mr/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusterer.java new file mode 100644 index 0000000..3ce4757 --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/clustering/canopy/CanopyClusterer.java @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.canopy; + +import java.util.Collection; +import java.util.Iterator; +import java.util.List; + +import org.apache.mahout.clustering.AbstractCluster; +import org.apache.mahout.common.distance.DistanceMeasure; +import org.apache.mahout.math.Vector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; + +@Deprecated +public class CanopyClusterer { + + private static final Logger log = LoggerFactory.getLogger(CanopyClusterer.class); + + private int nextCanopyId; + + // the T1 distance threshold + private double t1; + + // the T2 distance threshold + private double t2; + + // the T3 distance threshold + private double t3; + + // the T4 distance threshold + private double t4; + + // the distance measure + private DistanceMeasure measure; + + public CanopyClusterer(DistanceMeasure measure, double t1, double t2) { + this.t1 = t1; + this.t2 = t2; + this.t3 = t1; + this.t4 = t2; + this.measure = measure; + } + + public double getT1() { + return t1; + } + + public double getT2() { + return t2; + } + + public double getT3() { + return t3; + } + + public double getT4() { + return t4; + } + + /** + * Used by CanopyReducer to set t1=t3 and t2=t4 configuration values + */ + public void useT3T4() { + t1 = t3; + t2 = t4; + } + + /** + * This is the same algorithm as the reference but inverted to iterate over + * existing canopies instead of the points. Because of this it does not need + * to actually store the points, instead storing a total points vector and + * the number of points. From this a centroid can be computed. + * <p/> + * This method is used by the CanopyMapper, CanopyReducer and CanopyDriver. + * + * @param point + * the point to be added + * @param canopies + * the List<Canopy> to be appended + */ + public void addPointToCanopies(Vector point, Collection<Canopy> canopies) { + boolean pointStronglyBound = false; + for (Canopy canopy : canopies) { + double dist = measure.distance(canopy.getCenter().getLengthSquared(), canopy.getCenter(), point); + if (dist < t1) { + if (log.isDebugEnabled()) { + log.debug("Added point: {} to canopy: {}", AbstractCluster.formatVector(point, null), canopy.getIdentifier()); + } + canopy.observe(point); + } + pointStronglyBound = pointStronglyBound || dist < t2; + } + if (!pointStronglyBound) { + if (log.isDebugEnabled()) { + log.debug("Created new Canopy:{} at center:{}", nextCanopyId, AbstractCluster.formatVector(point, null)); + } + canopies.add(new Canopy(point, nextCanopyId++, measure)); + } + } + + /** + * Return if the point is covered by the canopy + * + * @param point + * a point + * @return if the point is covered + */ + public boolean canopyCovers(Canopy canopy, Vector point) { + return measure.distance(canopy.getCenter().getLengthSquared(), canopy.getCenter(), point) < t1; + } + + /** + * Iterate through the points, adding new canopies. Return the canopies. + * + * @param points + * a list<Vector> defining the points to be clustered + * @param measure + * a DistanceMeasure to use + * @param t1 + * the T1 distance threshold + * @param t2 + * the T2 distance threshold + * @return the List<Canopy> created + */ + public static List<Canopy> createCanopies(List<Vector> points, + DistanceMeasure measure, + double t1, + double t2) { + List<Canopy> canopies = Lists.newArrayList(); + /** + * Reference Implementation: Given a distance metric, one can create + * canopies as follows: Start with a list of the data points in any + * order, and with two distance thresholds, T1 and T2, where T1 > T2. + * (These thresholds can be set by the user, or selected by + * cross-validation.) Pick a point on the list and measure its distance + * to all other points. Put all points that are within distance + * threshold T1 into a canopy. Remove from the list all points that are + * within distance threshold T2. Repeat until the list is empty. + */ + int nextCanopyId = 0; + while (!points.isEmpty()) { + Iterator<Vector> ptIter = points.iterator(); + Vector p1 = ptIter.next(); + ptIter.remove(); + Canopy canopy = new Canopy(p1, nextCanopyId++, measure); + canopies.add(canopy); + while (ptIter.hasNext()) { + Vector p2 = ptIter.next(); + double dist = measure.distance(p1, p2); + // Put all points that are within distance threshold T1 into the + // canopy + if (dist < t1) { + canopy.observe(p2); + } + // Remove from the list all points that are within distance + // threshold T2 + if (dist < t2) { + ptIter.remove(); + } + } + for (Canopy c : canopies) { + c.computeParameters(); + } + } + return canopies; + } + + /** + * Iterate through the canopies, adding their centroids to a list + * + * @param canopies + * a List<Canopy> + * @return the List<Vector> + */ + public static List<Vector> getCenters(Iterable<Canopy> canopies) { + List<Vector> result = Lists.newArrayList(); + for (Canopy canopy : canopies) { + result.add(canopy.getCenter()); + } + return result; + } + + /** + * Iterate through the canopies, resetting their center to their centroids + * + * @param canopies + * a List<Canopy> + */ + public static void updateCentroids(Iterable<Canopy> canopies) { + for (Canopy canopy : canopies) { + canopy.computeParameters(); + } + } + + public void setT3(double t3) { + this.t3 = t3; + } + + public void setT4(double t4) { + this.t4 = t4; + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/clustering/canopy/CanopyConfigKeys.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/clustering/canopy/CanopyConfigKeys.java b/community/mahout-mr/src/main/java/org/apache/mahout/clustering/canopy/CanopyConfigKeys.java new file mode 100644 index 0000000..2f24026 --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/clustering/canopy/CanopyConfigKeys.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.canopy; + +import org.apache.hadoop.conf.Configuration; +import org.apache.mahout.common.ClassUtils; +import org.apache.mahout.common.distance.DistanceMeasure; + +@Deprecated +public final class CanopyConfigKeys { + + private CanopyConfigKeys() {} + + public static final String T1_KEY = "org.apache.mahout.clustering.canopy.t1"; + + public static final String T2_KEY = "org.apache.mahout.clustering.canopy.t2"; + + public static final String T3_KEY = "org.apache.mahout.clustering.canopy.t3"; + + public static final String T4_KEY = "org.apache.mahout.clustering.canopy.t4"; + + // keys used by Driver, Mapper, Combiner & Reducer + public static final String DISTANCE_MEASURE_KEY = "org.apache.mahout.clustering.canopy.measure"; + + public static final String CF_KEY = "org.apache.mahout.clustering.canopy.canopyFilter"; + + /** + * Create a {@link CanopyClusterer} from the Hadoop configuration. + * + * @param configuration Hadoop configuration + * + * @return CanopyClusterer + */ + public static CanopyClusterer configureCanopyClusterer(Configuration configuration) { + double t1 = Double.parseDouble(configuration.get(T1_KEY)); + double t2 = Double.parseDouble(configuration.get(T2_KEY)); + + DistanceMeasure measure = ClassUtils.instantiateAs(configuration.get(DISTANCE_MEASURE_KEY), DistanceMeasure.class); + measure.configure(configuration); + + CanopyClusterer canopyClusterer = new CanopyClusterer(measure, t1, t2); + + String d = configuration.get(T3_KEY); + if (d != null) { + canopyClusterer.setT3(Double.parseDouble(d)); + } + + d = configuration.get(T4_KEY); + if (d != null) { + canopyClusterer.setT4(Double.parseDouble(d)); + } + return canopyClusterer; + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java b/community/mahout-mr/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java new file mode 100644 index 0000000..06dc947 --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java @@ -0,0 +1,379 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.canopy; + +import java.io.IOException; +import java.util.Collection; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.hadoop.util.ToolRunner; +import org.apache.mahout.clustering.AbstractCluster; +import org.apache.mahout.clustering.Cluster; +import org.apache.mahout.clustering.classify.ClusterClassificationDriver; +import org.apache.mahout.clustering.classify.ClusterClassifier; +import org.apache.mahout.clustering.iterator.CanopyClusteringPolicy; +import org.apache.mahout.clustering.iterator.ClusterWritable; +import org.apache.mahout.clustering.topdown.PathDirectory; +import org.apache.mahout.common.AbstractJob; +import org.apache.mahout.common.ClassUtils; +import org.apache.mahout.common.HadoopUtil; +import org.apache.mahout.common.commandline.DefaultOptionCreator; +import org.apache.mahout.common.distance.DistanceMeasure; +import org.apache.mahout.common.iterator.sequencefile.PathFilters; +import org.apache.mahout.common.iterator.sequencefile.PathType; +import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterable; +import org.apache.mahout.math.VectorWritable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; +import com.google.common.io.Closeables; + +@Deprecated +public class CanopyDriver extends AbstractJob { + + public static final String DEFAULT_CLUSTERED_POINTS_DIRECTORY = "clusteredPoints"; + + private static final Logger log = LoggerFactory.getLogger(CanopyDriver.class); + + public static void main(String[] args) throws Exception { + ToolRunner.run(new Configuration(), new CanopyDriver(), args); + } + + @Override + public int run(String[] args) throws Exception { + + addInputOption(); + addOutputOption(); + addOption(DefaultOptionCreator.distanceMeasureOption().create()); + addOption(DefaultOptionCreator.t1Option().create()); + addOption(DefaultOptionCreator.t2Option().create()); + addOption(DefaultOptionCreator.t3Option().create()); + addOption(DefaultOptionCreator.t4Option().create()); + addOption(DefaultOptionCreator.clusterFilterOption().create()); + addOption(DefaultOptionCreator.overwriteOption().create()); + addOption(DefaultOptionCreator.clusteringOption().create()); + addOption(DefaultOptionCreator.methodOption().create()); + addOption(DefaultOptionCreator.outlierThresholdOption().create()); + + if (parseArguments(args) == null) { + return -1; + } + + Path input = getInputPath(); + Path output = getOutputPath(); + Configuration conf = getConf(); + if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) { + HadoopUtil.delete(conf, output); + } + String measureClass = getOption(DefaultOptionCreator.DISTANCE_MEASURE_OPTION); + double t1 = Double.parseDouble(getOption(DefaultOptionCreator.T1_OPTION)); + double t2 = Double.parseDouble(getOption(DefaultOptionCreator.T2_OPTION)); + double t3 = t1; + if (hasOption(DefaultOptionCreator.T3_OPTION)) { + t3 = Double.parseDouble(getOption(DefaultOptionCreator.T3_OPTION)); + } + double t4 = t2; + if (hasOption(DefaultOptionCreator.T4_OPTION)) { + t4 = Double.parseDouble(getOption(DefaultOptionCreator.T4_OPTION)); + } + int clusterFilter = 0; + if (hasOption(DefaultOptionCreator.CLUSTER_FILTER_OPTION)) { + clusterFilter = Integer + .parseInt(getOption(DefaultOptionCreator.CLUSTER_FILTER_OPTION)); + } + boolean runClustering = hasOption(DefaultOptionCreator.CLUSTERING_OPTION); + boolean runSequential = getOption(DefaultOptionCreator.METHOD_OPTION) + .equalsIgnoreCase(DefaultOptionCreator.SEQUENTIAL_METHOD); + DistanceMeasure measure = ClassUtils.instantiateAs(measureClass, DistanceMeasure.class); + double clusterClassificationThreshold = 0.0; + if (hasOption(DefaultOptionCreator.OUTLIER_THRESHOLD)) { + clusterClassificationThreshold = Double.parseDouble(getOption(DefaultOptionCreator.OUTLIER_THRESHOLD)); + } + run(conf, input, output, measure, t1, t2, t3, t4, clusterFilter, + runClustering, clusterClassificationThreshold, runSequential); + return 0; + } + + /** + * Build a directory of Canopy clusters from the input arguments and, if + * requested, cluster the input vectors using these clusters + * + * @param conf + * the Configuration + * @param input + * the Path to the directory containing input vectors + * @param output + * the Path for all output directories + * @param measure + * the DistanceMeasure + * @param t1 + * the double T1 distance metric + * @param t2 + * the double T2 distance metric + * @param t3 + * the reducer's double T1 distance metric + * @param t4 + * the reducer's double T2 distance metric + * @param clusterFilter + * the minimum canopy size output by the mappers + * @param runClustering + * cluster the input vectors if true + * @param clusterClassificationThreshold + * vectors having pdf below this value will not be clustered. Its value should be between 0 and 1. + * @param runSequential + * execute sequentially if true + */ + public static void run(Configuration conf, Path input, Path output, + DistanceMeasure measure, double t1, double t2, double t3, double t4, + int clusterFilter, boolean runClustering, double clusterClassificationThreshold, boolean runSequential) + throws IOException, InterruptedException, ClassNotFoundException { + Path clustersOut = buildClusters(conf, input, output, measure, t1, t2, t3, + t4, clusterFilter, runSequential); + if (runClustering) { + clusterData(conf, input, clustersOut, output, clusterClassificationThreshold, runSequential); + } + } + + /** + * Convenience method to provide backward compatibility + */ + public static void run(Configuration conf, Path input, Path output, + DistanceMeasure measure, double t1, double t2, boolean runClustering, + double clusterClassificationThreshold, boolean runSequential) throws IOException, InterruptedException, + ClassNotFoundException { + run(conf, input, output, measure, t1, t2, t1, t2, 0, runClustering, + clusterClassificationThreshold, runSequential); + } + + /** + * Convenience method creates new Configuration() Build a directory of Canopy + * clusters from the input arguments and, if requested, cluster the input + * vectors using these clusters + * + * @param input + * the Path to the directory containing input vectors + * @param output + * the Path for all output directories + * @param t1 + * the double T1 distance metric + * @param t2 + * the double T2 distance metric + * @param runClustering + * cluster the input vectors if true + * @param clusterClassificationThreshold + * vectors having pdf below this value will not be clustered. Its value should be between 0 and 1. + * @param runSequential + * execute sequentially if true + */ + public static void run(Path input, Path output, DistanceMeasure measure, + double t1, double t2, boolean runClustering, double clusterClassificationThreshold, boolean runSequential) + throws IOException, InterruptedException, ClassNotFoundException { + run(new Configuration(), input, output, measure, t1, t2, runClustering, + clusterClassificationThreshold, runSequential); + } + + /** + * Convenience method for backwards compatibility + * + */ + public static Path buildClusters(Configuration conf, Path input, Path output, + DistanceMeasure measure, double t1, double t2, int clusterFilter, + boolean runSequential) throws IOException, InterruptedException, + ClassNotFoundException { + return buildClusters(conf, input, output, measure, t1, t2, t1, t2, + clusterFilter, runSequential); + } + + /** + * Build a directory of Canopy clusters from the input vectors and other + * arguments. Run sequential or mapreduce execution as requested + * + * @param conf + * the Configuration to use + * @param input + * the Path to the directory containing input vectors + * @param output + * the Path for all output directories + * @param measure + * the DistanceMeasure + * @param t1 + * the double T1 distance metric + * @param t2 + * the double T2 distance metric + * @param t3 + * the reducer's double T1 distance metric + * @param t4 + * the reducer's double T2 distance metric + * @param clusterFilter + * the int minimum size of canopies produced + * @param runSequential + * a boolean indicates to run the sequential (reference) algorithm + * @return the canopy output directory Path + */ + public static Path buildClusters(Configuration conf, Path input, Path output, + DistanceMeasure measure, double t1, double t2, double t3, double t4, + int clusterFilter, boolean runSequential) throws IOException, + InterruptedException, ClassNotFoundException { + log.info("Build Clusters Input: {} Out: {} Measure: {} t1: {} t2: {}", + input, output, measure, t1, t2); + if (runSequential) { + return buildClustersSeq(input, output, measure, t1, t2, clusterFilter); + } else { + return buildClustersMR(conf, input, output, measure, t1, t2, t3, t4, + clusterFilter); + } + } + + /** + * Build a directory of Canopy clusters from the input vectors and other + * arguments. Run sequential execution + * + * @param input + * the Path to the directory containing input vectors + * @param output + * the Path for all output directories + * @param measure + * the DistanceMeasure + * @param t1 + * the double T1 distance metric + * @param t2 + * the double T2 distance metric + * @param clusterFilter + * the int minimum size of canopies produced + * @return the canopy output directory Path + */ + private static Path buildClustersSeq(Path input, Path output, + DistanceMeasure measure, double t1, double t2, int clusterFilter) + throws IOException { + CanopyClusterer clusterer = new CanopyClusterer(measure, t1, t2); + Collection<Canopy> canopies = Lists.newArrayList(); + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.get(input.toUri(), conf); + + for (VectorWritable vw : new SequenceFileDirValueIterable<VectorWritable>( + input, PathType.LIST, PathFilters.logsCRCFilter(), conf)) { + clusterer.addPointToCanopies(vw.get(), canopies); + } + + Path canopyOutputDir = new Path(output, Cluster.CLUSTERS_DIR + '0' + Cluster.FINAL_ITERATION_SUFFIX); + Path path = new Path(canopyOutputDir, "part-r-00000"); + SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path, + Text.class, ClusterWritable.class); + try { + ClusterWritable clusterWritable = new ClusterWritable(); + for (Canopy canopy : canopies) { + canopy.computeParameters(); + if (log.isDebugEnabled()) { + log.debug("Writing Canopy:{} center:{} numPoints:{} radius:{}", + canopy.getIdentifier(), + AbstractCluster.formatVector(canopy.getCenter(), null), + canopy.getNumObservations(), + AbstractCluster.formatVector(canopy.getRadius(), null)); + } + if (canopy.getNumObservations() > clusterFilter) { + clusterWritable.setValue(canopy); + writer.append(new Text(canopy.getIdentifier()), clusterWritable); + } + } + } finally { + Closeables.close(writer, false); + } + return canopyOutputDir; + } + + /** + * Build a directory of Canopy clusters from the input vectors and other + * arguments. Run mapreduce execution + * + * @param conf + * the Configuration + * @param input + * the Path to the directory containing input vectors + * @param output + * the Path for all output directories + * @param measure + * the DistanceMeasure + * @param t1 + * the double T1 distance metric + * @param t2 + * the double T2 distance metric + * @param t3 + * the reducer's double T1 distance metric + * @param t4 + * the reducer's double T2 distance metric + * @param clusterFilter + * the int minimum size of canopies produced + * @return the canopy output directory Path + */ + private static Path buildClustersMR(Configuration conf, Path input, + Path output, DistanceMeasure measure, double t1, double t2, double t3, + double t4, int clusterFilter) throws IOException, InterruptedException, + ClassNotFoundException { + conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY, measure.getClass() + .getName()); + conf.set(CanopyConfigKeys.T1_KEY, String.valueOf(t1)); + conf.set(CanopyConfigKeys.T2_KEY, String.valueOf(t2)); + conf.set(CanopyConfigKeys.T3_KEY, String.valueOf(t3)); + conf.set(CanopyConfigKeys.T4_KEY, String.valueOf(t4)); + conf.set(CanopyConfigKeys.CF_KEY, String.valueOf(clusterFilter)); + + Job job = new Job(conf, "Canopy Driver running buildClusters over input: " + + input); + job.setInputFormatClass(SequenceFileInputFormat.class); + job.setOutputFormatClass(SequenceFileOutputFormat.class); + job.setMapperClass(CanopyMapper.class); + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(VectorWritable.class); + job.setReducerClass(CanopyReducer.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(ClusterWritable.class); + job.setNumReduceTasks(1); + job.setJarByClass(CanopyDriver.class); + + FileInputFormat.addInputPath(job, input); + Path canopyOutputDir = new Path(output, Cluster.CLUSTERS_DIR + '0' + Cluster.FINAL_ITERATION_SUFFIX); + FileOutputFormat.setOutputPath(job, canopyOutputDir); + if (!job.waitForCompletion(true)) { + throw new InterruptedException("Canopy Job failed processing " + input); + } + return canopyOutputDir; + } + + private static void clusterData(Configuration conf, + Path points, + Path canopies, + Path output, + double clusterClassificationThreshold, + boolean runSequential) + throws IOException, InterruptedException, ClassNotFoundException { + ClusterClassifier.writePolicy(new CanopyClusteringPolicy(), canopies); + ClusterClassificationDriver.run(conf, points, output, new Path(output, PathDirectory.CLUSTERED_POINTS_DIRECTORY), + clusterClassificationThreshold, true, runSequential); + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/clustering/canopy/CanopyMapper.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/clustering/canopy/CanopyMapper.java b/community/mahout-mr/src/main/java/org/apache/mahout/clustering/canopy/CanopyMapper.java new file mode 100644 index 0000000..265d3da --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/clustering/canopy/CanopyMapper.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.canopy; + +import java.io.IOException; +import java.util.Collection; + +import com.google.common.collect.Lists; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.mahout.math.VectorWritable; + +@Deprecated +class CanopyMapper extends + Mapper<WritableComparable<?>, VectorWritable, Text, VectorWritable> { + + private final Collection<Canopy> canopies = Lists.newArrayList(); + + private CanopyClusterer canopyClusterer; + + private int clusterFilter; + + @Override + protected void map(WritableComparable<?> key, VectorWritable point, + Context context) throws IOException, InterruptedException { + canopyClusterer.addPointToCanopies(point.get(), canopies); + } + + @Override + protected void setup(Context context) throws IOException, + InterruptedException { + super.setup(context); + canopyClusterer = CanopyConfigKeys.configureCanopyClusterer(context.getConfiguration()); + clusterFilter = Integer.parseInt(context.getConfiguration().get( + CanopyConfigKeys.CF_KEY)); + } + + @Override + protected void cleanup(Context context) throws IOException, + InterruptedException { + for (Canopy canopy : canopies) { + canopy.computeParameters(); + if (canopy.getNumObservations() > clusterFilter) { + context.write(new Text("centroid"), new VectorWritable(canopy + .getCenter())); + } + } + super.cleanup(context); + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/clustering/canopy/CanopyReducer.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/clustering/canopy/CanopyReducer.java b/community/mahout-mr/src/main/java/org/apache/mahout/clustering/canopy/CanopyReducer.java new file mode 100644 index 0000000..cdd7d5e --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/clustering/canopy/CanopyReducer.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.canopy; + +import java.io.IOException; +import java.util.Collection; + +import com.google.common.collect.Lists; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.mahout.clustering.iterator.ClusterWritable; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; + +@Deprecated +public class CanopyReducer extends Reducer<Text, VectorWritable, Text, ClusterWritable> { + + private final Collection<Canopy> canopies = Lists.newArrayList(); + + private CanopyClusterer canopyClusterer; + + private int clusterFilter; + + CanopyClusterer getCanopyClusterer() { + return canopyClusterer; + } + + @Override + protected void reduce(Text arg0, Iterable<VectorWritable> values, + Context context) throws IOException, InterruptedException { + for (VectorWritable value : values) { + Vector point = value.get(); + canopyClusterer.addPointToCanopies(point, canopies); + } + for (Canopy canopy : canopies) { + canopy.computeParameters(); + if (canopy.getNumObservations() > clusterFilter) { + ClusterWritable clusterWritable = new ClusterWritable(); + clusterWritable.setValue(canopy); + context.write(new Text(canopy.getIdentifier()), clusterWritable); + } + } + } + + @Override + protected void setup(Context context) throws IOException, + InterruptedException { + super.setup(context); + canopyClusterer = CanopyConfigKeys.configureCanopyClusterer(context.getConfiguration()); + canopyClusterer.useT3T4(); + clusterFilter = Integer.parseInt(context.getConfiguration().get( + CanopyConfigKeys.CF_KEY)); + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationConfigKeys.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationConfigKeys.java b/community/mahout-mr/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationConfigKeys.java new file mode 100644 index 0000000..6b88388 --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationConfigKeys.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.classify; + +/** + * Constants used in Cluster Classification. + */ +public final class ClusterClassificationConfigKeys { + + public static final String CLUSTERS_IN = "clusters_in"; + + public static final String OUTLIER_REMOVAL_THRESHOLD = "pdf_threshold"; + + public static final String EMIT_MOST_LIKELY = "emit_most_likely"; + + private ClusterClassificationConfigKeys() { + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/5eda9e1f/community/mahout-mr/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationDriver.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationDriver.java b/community/mahout-mr/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationDriver.java new file mode 100644 index 0000000..ead95cf --- /dev/null +++ b/community/mahout-mr/src/main/java/org/apache/mahout/clustering/classify/ClusterClassificationDriver.java @@ -0,0 +1,313 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.classify; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.hadoop.util.ToolRunner; +import org.apache.mahout.clustering.Cluster; +import org.apache.mahout.clustering.iterator.ClusterWritable; +import org.apache.mahout.clustering.iterator.ClusteringPolicy; +import org.apache.mahout.clustering.iterator.DistanceMeasureCluster; +import org.apache.mahout.common.AbstractJob; +import org.apache.mahout.common.Pair; +import org.apache.mahout.common.commandline.DefaultOptionCreator; +import org.apache.mahout.common.distance.DistanceMeasure; +import org.apache.mahout.common.iterator.sequencefile.PathFilters; +import org.apache.mahout.common.iterator.sequencefile.PathType; +import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable; +import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterator; +import org.apache.mahout.math.NamedVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.Vector.Element; +import org.apache.mahout.math.VectorWritable; + +/** + * Classifies the vectors into different clusters found by the clustering + * algorithm. + */ +public final class ClusterClassificationDriver extends AbstractJob { + + /** + * CLI to run Cluster Classification Driver. + */ + @Override + public int run(String[] args) throws Exception { + + addInputOption(); + addOutputOption(); + addOption(DefaultOptionCreator.methodOption().create()); + addOption(DefaultOptionCreator.clustersInOption() + .withDescription("The input centroids, as Vectors. Must be a SequenceFile of Writable, Cluster/Canopy.") + .create()); + + if (parseArguments(args) == null) { + return -1; + } + + Path input = getInputPath(); + Path output = getOutputPath(); + + if (getConf() == null) { + setConf(new Configuration()); + } + Path clustersIn = new Path(getOption(DefaultOptionCreator.CLUSTERS_IN_OPTION)); + boolean runSequential = getOption(DefaultOptionCreator.METHOD_OPTION).equalsIgnoreCase( + DefaultOptionCreator.SEQUENTIAL_METHOD); + + double clusterClassificationThreshold = 0.0; + if (hasOption(DefaultOptionCreator.OUTLIER_THRESHOLD)) { + clusterClassificationThreshold = Double.parseDouble(getOption(DefaultOptionCreator.OUTLIER_THRESHOLD)); + } + + run(getConf(), input, clustersIn, output, clusterClassificationThreshold, true, runSequential); + + return 0; + } + + /** + * Constructor to be used by the ToolRunner. + */ + private ClusterClassificationDriver() { + } + + public static void main(String[] args) throws Exception { + ToolRunner.run(new Configuration(), new ClusterClassificationDriver(), args); + } + + /** + * Uses {@link ClusterClassifier} to classify input vectors into their + * respective clusters. + * + * @param input + * the input vectors + * @param clusteringOutputPath + * the output path of clustering ( it reads clusters-*-final file + * from here ) + * @param output + * the location to store the classified vectors + * @param clusterClassificationThreshold + * the threshold value of probability distribution function from 0.0 + * to 1.0. Any vector with pdf less that this threshold will not be + * classified for the cluster. + * @param runSequential + * Run the process sequentially or in a mapreduce way. + * @throws IOException + * @throws InterruptedException + * @throws ClassNotFoundException + */ + public static void run(Configuration conf, Path input, Path clusteringOutputPath, Path output, Double clusterClassificationThreshold, + boolean emitMostLikely, boolean runSequential) throws IOException, InterruptedException, ClassNotFoundException { + if (runSequential) { + classifyClusterSeq(conf, input, clusteringOutputPath, output, clusterClassificationThreshold, emitMostLikely); + } else { + classifyClusterMR(conf, input, clusteringOutputPath, output, clusterClassificationThreshold, emitMostLikely); + } + + } + + private static void classifyClusterSeq(Configuration conf, Path input, Path clusters, Path output, + Double clusterClassificationThreshold, boolean emitMostLikely) throws IOException { + List<Cluster> clusterModels = populateClusterModels(clusters, conf); + ClusteringPolicy policy = ClusterClassifier.readPolicy(finalClustersPath(conf, clusters)); + ClusterClassifier clusterClassifier = new ClusterClassifier(clusterModels, policy); + selectCluster(input, clusterModels, clusterClassifier, output, clusterClassificationThreshold, emitMostLikely); + + } + + /** + * Populates a list with clusters present in clusters-*-final directory. + * + * @param clusterOutputPath + * The output path of the clustering. + * @param conf + * The Hadoop Configuration + * @return The list of clusters found by the clustering. + * @throws IOException + */ + private static List<Cluster> populateClusterModels(Path clusterOutputPath, Configuration conf) throws IOException { + List<Cluster> clusterModels = new ArrayList<>(); + Path finalClustersPath = finalClustersPath(conf, clusterOutputPath); + Iterator<?> it = new SequenceFileDirValueIterator<>(finalClustersPath, PathType.LIST, + PathFilters.partFilter(), null, false, conf); + while (it.hasNext()) { + ClusterWritable next = (ClusterWritable) it.next(); + Cluster cluster = next.getValue(); + cluster.configure(conf); + clusterModels.add(cluster); + } + return clusterModels; + } + + private static Path finalClustersPath(Configuration conf, Path clusterOutputPath) throws IOException { + FileSystem fileSystem = clusterOutputPath.getFileSystem(conf); + FileStatus[] clusterFiles = fileSystem.listStatus(clusterOutputPath, PathFilters.finalPartFilter()); + return clusterFiles[0].getPath(); + } + + /** + * Classifies the vector into its respective cluster. + * + * @param input + * the path containing the input vector. + * @param clusterModels + * the clusters + * @param clusterClassifier + * used to classify the vectors into different clusters + * @param output + * the path to store classified data + * @param clusterClassificationThreshold + * the threshold value of probability distribution function from 0.0 + * to 1.0. Any vector with pdf less that this threshold will not be + * classified for the cluster + * @param emitMostLikely + * emit the vectors with the max pdf values per cluster + * @throws IOException + */ + private static void selectCluster(Path input, List<Cluster> clusterModels, ClusterClassifier clusterClassifier, + Path output, Double clusterClassificationThreshold, boolean emitMostLikely) throws IOException { + Configuration conf = new Configuration(); + SequenceFile.Writer writer = new SequenceFile.Writer(input.getFileSystem(conf), conf, new Path(output, + "part-m-" + 0), IntWritable.class, WeightedPropertyVectorWritable.class); + for (Pair<Writable, VectorWritable> vw : new SequenceFileDirIterable<Writable, VectorWritable>(input, PathType.LIST, + PathFilters.logsCRCFilter(), conf)) { + // Converting to NamedVectors to preserve the vectorId else its not obvious as to which point + // belongs to which cluster - fix for MAHOUT-1410 + Class<? extends Writable> keyClass = vw.getFirst().getClass(); + Vector vector = vw.getSecond().get(); + if (!keyClass.equals(NamedVector.class)) { + if (keyClass.equals(Text.class)) { + vector = new NamedVector(vector, vw.getFirst().toString()); + } else if (keyClass.equals(IntWritable.class)) { + vector = new NamedVector(vector, Integer.toString(((IntWritable) vw.getFirst()).get())); + } + } + Vector pdfPerCluster = clusterClassifier.classify(vector); + if (shouldClassify(pdfPerCluster, clusterClassificationThreshold)) { + classifyAndWrite(clusterModels, clusterClassificationThreshold, emitMostLikely, writer, new VectorWritable(vector), pdfPerCluster); + } + } + writer.close(); + } + + private static void classifyAndWrite(List<Cluster> clusterModels, Double clusterClassificationThreshold, + boolean emitMostLikely, SequenceFile.Writer writer, VectorWritable vw, Vector pdfPerCluster) throws IOException { + Map<Text, Text> props = new HashMap<>(); + if (emitMostLikely) { + int maxValueIndex = pdfPerCluster.maxValueIndex(); + WeightedPropertyVectorWritable weightedPropertyVectorWritable = + new WeightedPropertyVectorWritable(pdfPerCluster.maxValue(), vw.get(), props); + write(clusterModels, writer, weightedPropertyVectorWritable, maxValueIndex); + } else { + writeAllAboveThreshold(clusterModels, clusterClassificationThreshold, writer, vw, pdfPerCluster); + } + } + + private static void writeAllAboveThreshold(List<Cluster> clusterModels, Double clusterClassificationThreshold, + SequenceFile.Writer writer, VectorWritable vw, Vector pdfPerCluster) throws IOException { + Map<Text, Text> props = new HashMap<>(); + for (Element pdf : pdfPerCluster.nonZeroes()) { + if (pdf.get() >= clusterClassificationThreshold) { + WeightedPropertyVectorWritable wvw = new WeightedPropertyVectorWritable(pdf.get(), vw.get(), props); + int clusterIndex = pdf.index(); + write(clusterModels, writer, wvw, clusterIndex); + } + } + } + + private static void write(List<Cluster> clusterModels, SequenceFile.Writer writer, + WeightedPropertyVectorWritable weightedPropertyVectorWritable, + int maxValueIndex) throws IOException { + Cluster cluster = clusterModels.get(maxValueIndex); + + DistanceMeasureCluster distanceMeasureCluster = (DistanceMeasureCluster) cluster; + DistanceMeasure distanceMeasure = distanceMeasureCluster.getMeasure(); + double distance = distanceMeasure.distance(cluster.getCenter(), weightedPropertyVectorWritable.getVector()); + + weightedPropertyVectorWritable.getProperties().put(new Text("distance"), new Text(Double.toString(distance))); + writer.append(new IntWritable(cluster.getId()), weightedPropertyVectorWritable); + } + + /** + * Decides whether the vector should be classified or not based on the max pdf + * value of the clusters and threshold value. + * + * @return whether the vector should be classified or not. + */ + private static boolean shouldClassify(Vector pdfPerCluster, Double clusterClassificationThreshold) { + return pdfPerCluster.maxValue() >= clusterClassificationThreshold; + } + + private static void classifyClusterMR(Configuration conf, Path input, Path clustersIn, Path output, + Double clusterClassificationThreshold, boolean emitMostLikely) throws IOException, InterruptedException, + ClassNotFoundException { + + conf.setFloat(ClusterClassificationConfigKeys.OUTLIER_REMOVAL_THRESHOLD, + clusterClassificationThreshold.floatValue()); + conf.setBoolean(ClusterClassificationConfigKeys.EMIT_MOST_LIKELY, emitMostLikely); + conf.set(ClusterClassificationConfigKeys.CLUSTERS_IN, clustersIn.toUri().toString()); + + Job job = new Job(conf, "Cluster Classification Driver running over input: " + input); + job.setJarByClass(ClusterClassificationDriver.class); + + job.setInputFormatClass(SequenceFileInputFormat.class); + job.setOutputFormatClass(SequenceFileOutputFormat.class); + + job.setMapperClass(ClusterClassificationMapper.class); + job.setNumReduceTasks(0); + + job.setOutputKeyClass(IntWritable.class); + job.setOutputValueClass(WeightedPropertyVectorWritable.class); + + FileInputFormat.addInputPath(job, input); + FileOutputFormat.setOutputPath(job, output); + if (!job.waitForCompletion(true)) { + throw new InterruptedException("Cluster Classification Driver Job failed processing " + input); + } + } + + public static void run(Configuration conf, Path input, Path clusteringOutputPath, Path output, + double clusterClassificationThreshold, boolean emitMostLikely, boolean runSequential) throws IOException, + InterruptedException, ClassNotFoundException { + if (runSequential) { + classifyClusterSeq(conf, input, clusteringOutputPath, output, clusterClassificationThreshold, emitMostLikely); + } else { + classifyClusterMR(conf, input, clusteringOutputPath, output, clusterClassificationThreshold, emitMostLikely); + } + + } + +}
