Author: smarthi
Date: Mon Dec 23 19:20:30 2013
New Revision: 1553189

URL: http://svn.apache.org/r1553189
Log:
MAHOUT-1358 - earlier fix for this issue throws a heap space exception for 
large datasets during the Mapper phase, new fix in place now and code cleanup.

Modified:
    
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansDriver.java
    
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansMapper.java
    
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansThread.java

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansDriver.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansDriver.java?rev=1553189&r1=1553188&r2=1553189&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansDriver.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansDriver.java
 Mon Dec 23 19:20:30 2013
@@ -407,7 +407,6 @@ public final class StreamingKMeansDriver
    * @param output the directory pathname for output points.
    * @return 0 on success, -1 on failure.
    */
-  @SuppressWarnings("unchecked")
   public static int run(Configuration conf, Path input, Path output)
       throws IOException, InterruptedException, ClassNotFoundException, 
ExecutionException {
     log.info("Starting StreamingKMeans clustering for vectors in {}; results 
are output to {}",
@@ -455,7 +454,6 @@ public final class StreamingKMeansDriver
     return 0;
   }
 
-  @SuppressWarnings("unchecked")
   public static int runMapReduce(Configuration conf, Path input, Path output)
     throws IOException, ClassNotFoundException, InterruptedException {
     // Prepare Job for submission.

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansMapper.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansMapper.java?rev=1553189&r1=1553188&r2=1553189&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansMapper.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansMapper.java
 Mon Dec 23 19:20:30 2013
@@ -80,7 +80,7 @@ public class StreamingKMeansMapper exten
         estimatePoints.add(centroid);
       } else if (numPoints == NUM_ESTIMATE_POINTS) {
         clusterEstimatePoints();
-  }
+      }
     } else {
       clusterer.cluster(centroid);
     }

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansThread.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansThread.java?rev=1553189&r1=1553188&r2=1553189&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansThread.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansThread.java
 Mon Dec 23 19:20:30 2013
@@ -21,7 +21,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.Callable;
 
-import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -60,28 +59,32 @@ public class StreamingKMeansThread imple
         StreamingKMeansDriver.INVALID_DISTANCE_CUTOFF);
 
     Iterator<Centroid> dataPointsIterator = dataPoints.iterator();
-    List<Centroid> dataPointsList = Lists.newArrayList();
+
     if (estimateDistanceCutoff == 
StreamingKMeansDriver.INVALID_DISTANCE_CUTOFF) {
       List<Centroid> estimatePoints = 
Lists.newArrayListWithExpectedSize(NUM_ESTIMATE_POINTS);
       while (dataPointsIterator.hasNext() && estimatePoints.size() < 
NUM_ESTIMATE_POINTS) {
         Centroid centroid = dataPointsIterator.next();
         estimatePoints.add(centroid);
-        dataPointsList.add(centroid);
       }
 
       if (log.isInfoEnabled()) {
         log.info("Estimated Points: {}", estimatePoints.size());
       }
       estimateDistanceCutoff = 
ClusteringUtils.estimateDistanceCutoff(estimatePoints, 
searcher.getDistanceMeasure());
-
-    } else {
-      Iterators.addAll(dataPointsList, dataPointsIterator);
     }
 
     StreamingKMeans streamingKMeans = new StreamingKMeans(searcher, 
numClusters, estimateDistanceCutoff);
-    for (Centroid aDataPoints : dataPointsList) {
-      streamingKMeans.cluster(aDataPoints);
+
+    // datapointsIterator could be empty if no estimate distance was initially 
provided
+    // hence creating the iterator again here for the clustering
+    if (!dataPointsIterator.hasNext()) {
+      dataPointsIterator = dataPoints.iterator();
     }
+
+    while (dataPointsIterator.hasNext()) {
+      streamingKMeans.cluster(dataPointsIterator.next());
+    }
+
     streamingKMeans.reindexCentroids();
     return streamingKMeans;
   }


Reply via email to