Author: smarthi
Date: Mon Dec 23 19:20:30 2013
New Revision: 1553189
URL: http://svn.apache.org/r1553189
Log:
MAHOUT-1358 - earlier fix for this issue throws a heap space exception for
large datasets during the Mapper phase, new fix in place now and code cleanup.
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansDriver.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansThread.java
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansDriver.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansDriver.java?rev=1553189&r1=1553188&r2=1553189&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansDriver.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansDriver.java
Mon Dec 23 19:20:30 2013
@@ -407,7 +407,6 @@ public final class StreamingKMeansDriver
* @param output the directory pathname for output points.
* @return 0 on success, -1 on failure.
*/
- @SuppressWarnings("unchecked")
public static int run(Configuration conf, Path input, Path output)
throws IOException, InterruptedException, ClassNotFoundException,
ExecutionException {
log.info("Starting StreamingKMeans clustering for vectors in {}; results
are output to {}",
@@ -455,7 +454,6 @@ public final class StreamingKMeansDriver
return 0;
}
- @SuppressWarnings("unchecked")
public static int runMapReduce(Configuration conf, Path input, Path output)
throws IOException, ClassNotFoundException, InterruptedException {
// Prepare Job for submission.
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansMapper.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansMapper.java?rev=1553189&r1=1553188&r2=1553189&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansMapper.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansMapper.java
Mon Dec 23 19:20:30 2013
@@ -80,7 +80,7 @@ public class StreamingKMeansMapper exten
estimatePoints.add(centroid);
} else if (numPoints == NUM_ESTIMATE_POINTS) {
clusterEstimatePoints();
- }
+ }
} else {
clusterer.cluster(centroid);
}
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansThread.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansThread.java?rev=1553189&r1=1553188&r2=1553189&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansThread.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansThread.java
Mon Dec 23 19:20:30 2013
@@ -21,7 +21,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
-import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -60,28 +59,32 @@ public class StreamingKMeansThread imple
StreamingKMeansDriver.INVALID_DISTANCE_CUTOFF);
Iterator<Centroid> dataPointsIterator = dataPoints.iterator();
- List<Centroid> dataPointsList = Lists.newArrayList();
+
if (estimateDistanceCutoff ==
StreamingKMeansDriver.INVALID_DISTANCE_CUTOFF) {
List<Centroid> estimatePoints =
Lists.newArrayListWithExpectedSize(NUM_ESTIMATE_POINTS);
while (dataPointsIterator.hasNext() && estimatePoints.size() <
NUM_ESTIMATE_POINTS) {
Centroid centroid = dataPointsIterator.next();
estimatePoints.add(centroid);
- dataPointsList.add(centroid);
}
if (log.isInfoEnabled()) {
log.info("Estimated Points: {}", estimatePoints.size());
}
estimateDistanceCutoff =
ClusteringUtils.estimateDistanceCutoff(estimatePoints,
searcher.getDistanceMeasure());
-
- } else {
- Iterators.addAll(dataPointsList, dataPointsIterator);
}
StreamingKMeans streamingKMeans = new StreamingKMeans(searcher,
numClusters, estimateDistanceCutoff);
- for (Centroid aDataPoints : dataPointsList) {
- streamingKMeans.cluster(aDataPoints);
+
+ // datapointsIterator could be empty if no estimate distance was initially
provided
+ // hence creating the iterator again here for the clustering
+ if (!dataPointsIterator.hasNext()) {
+ dataPointsIterator = dataPoints.iterator();
}
+
+ while (dataPointsIterator.hasNext()) {
+ streamingKMeans.cluster(dataPointsIterator.next());
+ }
+
streamingKMeans.reindexCentroids();
return streamingKMeans;
}