Author: dfilimon
Date: Tue May 21 10:52:25 2013
New Revision: 1484747
URL: http://svn.apache.org/r1484747
Log:
MAHOUT-1223: Fixed point being skipped in StreamingKMeans when iterating
through centroids from a reducer
When calling StreamingKMeans in the reducer (to collapse the number of clusters
to they can fit into memory), the clustering is done on the Hadoop reducer
iterable.
Currently, the first Centroid is added directly as a special case and then is
skipped when iterating through the main loop.
However, Hadoop reducer iterables cannot be rewound therefore causing SKM to
skip one point.
Modified:
mahout/trunk/CHANGELOG
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/streaming/cluster/StreamingKMeans.java
Modified: mahout/trunk/CHANGELOG
URL:
http://svn.apache.org/viewvc/mahout/trunk/CHANGELOG?rev=1484747&r1=1484746&r2=1484747&view=diff
==============================================================================
--- mahout/trunk/CHANGELOG (original)
+++ mahout/trunk/CHANGELOG Tue May 21 10:52:25 2013
@@ -2,6 +2,8 @@ Mahout Change Log
Release 0.8 - unreleased
+__MAHOUT-1223: Fixed point skipped in StreamingKMeans when iterating through
centroids from a reducer (dfilimon)
+
__MAHOUT-1222: Fix total weight in FastProjectionSearch (dfilimon)
__MAHOUT-1219: Remove LSHSearcher from StreamingKMeansTest. It causes it to
sometimes fail (dfilimon)
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/streaming/cluster/StreamingKMeans.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/streaming/cluster/StreamingKMeans.java?rev=1484747&r1=1484746&r2=1484747&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/streaming/cluster/StreamingKMeans.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/streaming/cluster/StreamingKMeans.java
Tue May 21 10:52:25 2013
@@ -260,6 +260,11 @@ public class StreamingKMeans implements
* @return the UpdatableSearcher containing the resulting centroids.
*/
private UpdatableSearcher clusterInternal(Iterable<Centroid> datapoints,
boolean collapseClusters) {
+ Iterator<Centroid> datapointsIterator = datapoints.iterator();
+ if (!datapointsIterator.hasNext()) {
+ return centroids;
+ }
+
int oldNumProcessedDataPoints = numProcessedDatapoints;
// We clear the centroids we have in case of cluster collapse, the old
clusters are the
// datapoints but we need to re-cluster them.
@@ -268,19 +273,18 @@ public class StreamingKMeans implements
numProcessedDatapoints = 0;
}
- int numCentroidsToSkip = 0;
if (centroids.size() == 0) {
// Assign the first datapoint to the first cluster.
// Adding a vector to a searcher would normally just reference the copy,
// but we could potentially mutate it and so we need to make a clone.
- centroids.add(Iterables.get(datapoints, 0).clone());
- numCentroidsToSkip = 1;
+ centroids.add(datapointsIterator.next().clone());
++numProcessedDatapoints;
}
// To cluster, we scan the data and either add each point to the nearest
group or create a new group.
// when we get too many groups, we need to increase the threshold and
rescan our current groups
- for (Centroid row : Iterables.skip(datapoints, numCentroidsToSkip)) {
+ while (datapointsIterator.hasNext()) {
+ Centroid row = datapointsIterator.next();
// Get the closest vector and its weight as a WeightedThing<Vector>.
// The weight of the WeightedThing is the distance to the query and the
value is a
// reference to one of the vectors we added to the searcher previously.