Author: jeastman
Date: Thu Mar  8 22:27:28 2012
New Revision: 1298625

URL: http://svn.apache.org/viewvc?rev=1298625&view=rev
Log:
MAHOUT-933:
- refactored ClusteringPolicies into hierarchy under new 
AbstractClusteringPolicy
- added close() to ClusteringPolicy to allow policy-specific actions needed to 
compute convergence
- removed ClusteringPolicy from ClusterIterator constructor as 
ClusterClassifier already has one
- added convergence computations for kmeans and fuzzyk
- added final clustersOut renaming to add -final suffix
- updated Display examples and unit tests to reflect above
- all tests run

I think it is time to begin refactoring the buildClusters methods of the 
respective clustering drivers to use ClusterIterator as it seems to be 
producing equivalent results to the original implementations. This will involve 
removing a lot of existing driver, mapper and reducer code and many 
time-consuming unit tests. It will also have some impact on other components as 
the representation of clusters in the file system changes from Cluster to 
self-describing ClusterWritable.

Added:
    
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/AbstractClusteringPolicy.java
   (with props)
Modified:
    
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassifier.java
    
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/CIReducer.java
    
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/CanopyClusteringPolicy.java
    
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/ClusterIterator.java
    
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/ClusteringPolicy.java
    
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/DirichletClusteringPolicy.java
    
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/FuzzyKMeansClusteringPolicy.java
    
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/KMeansClusteringPolicy.java
    
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/MeanShiftClusteringPolicy.java
    
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/Kluster.java
    
mahout/trunk/core/src/test/java/org/apache/mahout/clustering/iterator/TestClusterClassifier.java
    
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/ClustersFilter.java
    
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayClustering.java
    
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayDirichlet.java
    
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayFuzzyKMeans.java
    
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayKMeans.java
    
mahout/trunk/examples/src/test/java/org/apache/mahout/clustering/display/ClustersFilterTest.java

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassifier.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassifier.java?rev=1298625&r1=1298624&r2=1298625&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassifier.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/classify/ClusterClassifier.java
 Thu Mar  8 22:27:28 2012
@@ -45,17 +45,27 @@ import com.google.common.collect.Lists;
 import com.google.common.io.Closeables;
 
 /**
- * This classifier works with any clustering Cluster. It is initialized with a
- * list of compatible clusters and thereafter it can classify any new Vector
- * into one or more of the clusters based upon the pdf() function which each
- * cluster supports.
+ * This classifier works with any ClusteringPolicy and its associated Clusters.
+ * It is initialized with a policy and a list of compatible clusters and
+ * thereafter it can classify any new Vector into one or more of the clusters
+ * based upon the pdf() function which each cluster supports.
  * 
  * In addition, it is an OnlineLearner and can be trained. Training amounts to
  * asking the actual model to observe the vector and closing the classifier
  * causes all the models to computeParameters.
+ * 
+ * Because a ClusterClassifier implements Writable, it can be written-to and
+ * read-from a sequence file as a single entity. For sequential and mapreduce
+ * clustering in conjunction with a ClusterIterator; however, it utilizes an
+ * exploded file format. In this format, the iterator writes the policy to a
+ * single POLICY_FILE_NAME file in the clustersOut directory and the models are
+ * written to one or more part-n files so that multiple reducers may employed 
to
+ * produce them.
  */
 public class ClusterClassifier extends AbstractVectorClassifier implements 
OnlineLearner, Writable {
   
+  private static final String POLICY_FILE_NAME = "_policy";
+  
   private List<Cluster> models;
   
   private String modelClass;
@@ -86,7 +96,7 @@ public class ClusterClassifier extends A
   
   @Override
   public Vector classify(Vector instance) {
-    return policy.classify(instance, models);
+    return policy.classify(instance, this);
   }
   
   @Override
@@ -160,9 +170,7 @@ public class ClusterClassifier extends A
   
   @Override
   public void close() {
-    for (Cluster cluster : models) {
-      cluster.computeParameters();
-    }
+    policy.close(this);
   }
   
   public List<Cluster> getModels() {
@@ -207,7 +215,7 @@ public class ClusterClassifier extends A
   }
   
   public static ClusteringPolicy readPolicy(Path path) throws IOException {
-    Path policyPath = new Path(path, "_policy");
+    Path policyPath = new Path(path, POLICY_FILE_NAME);
     Configuration config = new Configuration();
     FileSystem fs = FileSystem.get(policyPath.toUri(), config);
     SequenceFile.Reader reader = new SequenceFile.Reader(fs, policyPath, 
config);
@@ -218,7 +226,7 @@ public class ClusterClassifier extends A
   }
   
   public static void writePolicy(ClusteringPolicy policy, Path path) throws 
IOException {
-    Path policyPath = new Path(path, "_policy");
+    Path policyPath = new Path(path, POLICY_FILE_NAME);
     Configuration config = new Configuration();
     FileSystem fs = FileSystem.get(policyPath.toUri(), config);
     SequenceFile.Writer writer = new SequenceFile.Writer(fs, config, 
policyPath, Text.class,

Added: 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/AbstractClusteringPolicy.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/AbstractClusteringPolicy.java?rev=1298625&view=auto
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/AbstractClusteringPolicy.java
 (added)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/AbstractClusteringPolicy.java
 Thu Mar  8 22:27:28 2012
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.iterator;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.mahout.clustering.Cluster;
+import org.apache.mahout.clustering.classify.ClusterClassifier;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.SequentialAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.function.TimesFunction;
+
+public abstract class AbstractClusteringPolicy implements ClusteringPolicy {
+  
+  @Override
+  public abstract void write(DataOutput out) throws IOException;
+  
+  @Override
+  public abstract void readFields(DataInput in) throws IOException;
+  
+  @Override
+  public Vector select(Vector probabilities) {
+    int maxValueIndex = probabilities.maxValueIndex();
+    Vector weights = new SequentialAccessSparseVector(probabilities.size());
+    weights.set(maxValueIndex, 1.0);
+    return weights;
+  }
+  
+  @Override
+  public void update(ClusterClassifier posterior) {
+    // nothing to do in general here
+  }
+  
+  @Override
+  public Vector classify(Vector data, ClusterClassifier prior) {
+    List<Cluster> models = prior.getModels();
+    int i = 0;
+    Vector pdfs = new DenseVector(models.size());
+    for (Cluster model : models) {
+      pdfs.set(i++, model.pdf(new VectorWritable(data)));
+    }
+    return pdfs.assign(new TimesFunction(), 1.0 / pdfs.zSum());
+  }
+  
+  @Override
+  public void close(ClusterClassifier posterior) {
+    for (Cluster cluster : posterior.getModels()) {
+      cluster.computeParameters();
+    }
+    
+  }
+  
+}

Propchange: 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/AbstractClusteringPolicy.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/CIReducer.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/CIReducer.java?rev=1298625&r1=1298624&r2=1298625&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/CIReducer.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/CIReducer.java
 Thu Mar  8 22:27:28 2012
@@ -18,13 +18,21 @@
 package org.apache.mahout.clustering.iterator;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.clustering.Cluster;
+import org.apache.mahout.clustering.classify.ClusterClassifier;
 
 public class CIReducer extends 
Reducer<IntWritable,ClusterWritable,IntWritable,ClusterWritable> {
   
+  private ClusterClassifier classifier;
+  private ClusteringPolicy policy;
+  
   @Override
   protected void reduce(IntWritable key, Iterable<ClusterWritable> values, 
Context context) throws IOException,
       InterruptedException {
@@ -38,8 +46,28 @@ public class CIReducer extends Reducer<I
         first.getValue().observe(cw.getValue());
       }
     }
-    first.getValue().computeParameters();
+    List<Cluster> models = new ArrayList<Cluster>();
+    models.add(first.getValue());
+    classifier = new ClusterClassifier(models, policy);
+    classifier.close();
     context.write(key, first);
   }
   
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * 
org.apache.hadoop.mapreduce.Mapper#setup(org.apache.hadoop.mapreduce.Mapper
+   * .Context)
+   */
+  @Override
+  protected void setup(Context context) throws IOException, 
InterruptedException {
+    String priorClustersPath = 
context.getConfiguration().get(ClusterIterator.PRIOR_PATH_KEY);
+    classifier = new ClusterClassifier();
+    classifier.readFromSeqFiles(new Path(priorClustersPath));
+    policy = classifier.getPolicy();
+    policy.update(classifier);
+    super.setup(context);
+  }
+  
 }

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/CanopyClusteringPolicy.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/CanopyClusteringPolicy.java?rev=1298625&r1=1298624&r2=1298625&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/CanopyClusteringPolicy.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/CanopyClusteringPolicy.java
 Thu Mar  8 22:27:28 2012
@@ -19,22 +19,16 @@ package org.apache.mahout.clustering.ite
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.List;
 
-import org.apache.mahout.clustering.Cluster;
-import org.apache.mahout.clustering.classify.ClusterClassifier;
-import org.apache.mahout.math.DenseVector;
 import org.apache.mahout.math.SequentialAccessSparseVector;
 import org.apache.mahout.math.Vector;
-import org.apache.mahout.math.VectorWritable;
-import org.apache.mahout.math.function.TimesFunction;
 
 /**
  * This is a simple maximum likelihood clustering policy, suitable for k-means
  * clustering
  * 
  */
-public class CanopyClusteringPolicy implements ClusteringPolicy {
+public class CanopyClusteringPolicy extends AbstractClusteringPolicy {
   
   public CanopyClusteringPolicy() {
     super();
@@ -60,28 +54,6 @@ public class CanopyClusteringPolicy impl
   /*
    * (non-Javadoc)
    * 
-   * @see
-   * org.apache.mahout.clustering.ClusteringPolicy#update(org.apache.mahout.
-   * clustering.ClusterClassifier)
-   */
-  @Override
-  public void update(ClusterClassifier posterior) {
-    // nothing to do here
-  }
-  
-  @Override
-  public Vector classify(Vector data, List<Cluster> models) {
-    int i = 0;
-    Vector pdfs = new DenseVector(models.size());
-    for (Cluster model : models) {
-      pdfs.set(i++, model.pdf(new VectorWritable(data)));
-    }
-    return pdfs.assign(new TimesFunction(), 1.0 / pdfs.zSum());
-  }
-  
-  /*
-   * (non-Javadoc)
-   * 
    * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
    */
   @Override

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/ClusterIterator.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/ClusterIterator.java?rev=1298625&r1=1298624&r2=1298625&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/ClusterIterator.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/ClusterIterator.java
 Thu Mar  8 22:27:28 2012
@@ -29,6 +29,7 @@ import org.apache.hadoop.mapreduce.lib.i
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.mahout.clustering.Cluster;
 import org.apache.mahout.clustering.classify.ClusterClassifier;
 import org.apache.mahout.common.HadoopUtil;
 import org.apache.mahout.common.iterator.sequencefile.PathFilters;
@@ -50,25 +51,24 @@ import com.google.common.io.Closeables;
 public class ClusterIterator {
   
   public static final String PRIOR_PATH_KEY = 
"org.apache.mahout.clustering.prior.path";
-  public ClusterIterator(ClusteringPolicy policy) {
-    this.policy = policy;
-  }
-  
-  private final ClusteringPolicy policy;
   
   /**
    * Iterate over data using a prior-trained ClusterClassifier, for a number of
    * iterations
    * 
+   * @param policy
+   *          the ClusteringPolicy to use
    * @param data
    *          a {@code List<Vector>} of input vectors
    * @param classifier
    *          a prior ClusterClassifier
    * @param numIterations
    *          the int number of iterations to perform
+   * 
    * @return the posterior ClusterClassifier
    */
   public ClusterClassifier iterate(Iterable<Vector> data, ClusterClassifier 
classifier, int numIterations) {
+    ClusteringPolicy policy = classifier.getPolicy();
     for (int iteration = 1; iteration <= numIterations; iteration++) {
       for (Vector vector : data) {
         // update the policy based upon the prior
@@ -101,20 +101,24 @@ public class ClusterIterator {
    *          a Path of output directory
    * @param numIterations
    *          the int number of iterations to perform
+   * 
    * @throws IOException
    */
   public void iterateSeq(Path inPath, Path priorPath, Path outPath, int 
numIterations) throws IOException {
     ClusterClassifier classifier = new ClusterClassifier();
     classifier.readFromSeqFiles(priorPath);
     Configuration conf = new Configuration();
-    for (int iteration = 1; iteration <= numIterations; iteration++) {
+    HadoopUtil.delete(conf, outPath);
+    Path clustersOut = null;
+    int iteration = 1;
+    while (iteration <= numIterations) {
       for (VectorWritable vw : new 
SequenceFileDirValueIterable<VectorWritable>(inPath, PathType.LIST,
           PathFilters.logsCRCFilter(), conf)) {
         Vector vector = vw.get();
         // classification yields probabilities
         Vector probabilities = classifier.classify(vector);
         // policy selects weights for models given those probabilities
-        Vector weights = policy.select(probabilities);
+        Vector weights = classifier.getPolicy().select(probabilities);
         // training causes all models to observe data
         for (Iterator<Vector.Element> it = weights.iterateNonZero(); 
it.hasNext();) {
           int index = it.next().index();
@@ -124,10 +128,18 @@ public class ClusterIterator {
       // compute the posterior models
       classifier.close();
       // update the policy
-      policy.update(classifier);
+      classifier.getPolicy().update(classifier);
       // output the classifier
-      classifier.writeToSeqFiles(new Path(outPath, "classifier-" + iteration));
+      clustersOut = new Path(outPath, Cluster.CLUSTERS_DIR + iteration);
+      classifier.writeToSeqFiles(clustersOut);
+      FileSystem fs = FileSystem.get(outPath.toUri(), conf);
+      iteration++;
+      if (isConverged(clustersOut, conf, fs)) {
+        break;
+      }
     }
+    Path finalClustersIn = new Path(outPath, Cluster.CLUSTERS_DIR + (iteration 
- 1) + Cluster.FINAL_ITERATION_SUFFIX);
+    FileSystem.get(clustersOut.toUri(), conf).rename(clustersOut, 
finalClustersIn);
   }
   
   /**
@@ -147,7 +159,10 @@ public class ClusterIterator {
       InterruptedException, ClassNotFoundException {
     Configuration conf = new Configuration();
     HadoopUtil.delete(conf, outPath);
-    for (int iteration = 1; iteration <= numIterations; iteration++) {
+    ClusteringPolicy policy = ClusterClassifier.readPolicy(priorPath);
+    Path clustersOut = null;
+    int iteration = 1;
+    while (iteration <= numIterations) {
       conf.set(PRIOR_PATH_KEY, priorPath.toString());
       
       String jobName = "Cluster Iterator running iteration " + iteration + " 
over priorPath: " + priorPath;
@@ -164,7 +179,7 @@ public class ClusterIterator {
       job.setReducerClass(CIReducer.class);
       
       FileInputFormat.addInputPath(job, inPath);
-      Path clustersOut = new Path(outPath, "clusters-" + iteration);
+      clustersOut = new Path(outPath, Cluster.CLUSTERS_DIR + iteration);
       priorPath = clustersOut;
       FileOutputFormat.setOutputPath(job, clustersOut);
       
@@ -174,10 +189,13 @@ public class ClusterIterator {
       }
       ClusterClassifier.writePolicy(policy, clustersOut);
       FileSystem fs = FileSystem.get(outPath.toUri(), conf);
+      iteration++;
       if (isConverged(clustersOut, conf, fs)) {
         break;
       }
     }
+    Path finalClustersIn = new Path(outPath, Cluster.CLUSTERS_DIR + (iteration 
- 1) + Cluster.FINAL_ITERATION_SUFFIX);
+    FileSystem.get(clustersOut.toUri(), conf).rename(clustersOut, 
finalClustersIn);
   }
   
   /**

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/ClusteringPolicy.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/ClusteringPolicy.java?rev=1298625&r1=1298624&r2=1298625&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/ClusteringPolicy.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/ClusteringPolicy.java
 Thu Mar  8 22:27:28 2012
@@ -16,10 +16,7 @@
  */
 package org.apache.mahout.clustering.iterator;
 
-import java.util.List;
-
 import org.apache.hadoop.io.Writable;
-import org.apache.mahout.clustering.Cluster;
 import org.apache.mahout.clustering.classify.ClusterClassifier;
 import org.apache.mahout.math.Vector;
 
@@ -30,13 +27,25 @@ import org.apache.mahout.math.Vector;
 public interface ClusteringPolicy extends Writable {
   
   /**
+   * Classify the data vector given the classifier's models
+   * 
+   * @param data
+   *          a data Vector
+   * @param prior
+   *          a prior ClusterClassifier
+   * @return a Vector of probabilities that the data is described by each of 
the
+   *         models
+   */
+  public Vector classify(Vector data, ClusterClassifier prior);
+  
+  /**
    * Return a vector of weights for each of the models given those 
probabilities
    * 
    * @param probabilities
    *          a Vector of pdfs
    * @return a Vector of weights
    */
-  Vector select(Vector probabilities);
+  public Vector select(Vector probabilities);
   
   /**
    * Update the policy with the given classifier
@@ -44,16 +53,14 @@ public interface ClusteringPolicy extend
    * @param posterior
    *          a ClusterClassifier
    */
-  void update(ClusterClassifier posterior);
+  public void update(ClusterClassifier posterior);
   
   /**
-   * @param data
-   *          a data Vector
-   * @param models
-   *          a list of Cluster models
-   * @return a Vector of probabilities that the data is described by each of 
the
-   *         models
+   * Close the policy using the classifier's models
+   * 
+   * @param posterior
+   *          a posterior ClusterClassifier
    */
-  Vector classify(Vector data, List<Cluster> models);
+  public void close(ClusterClassifier posterior);
   
 }
\ No newline at end of file

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/DirichletClusteringPolicy.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/DirichletClusteringPolicy.java?rev=1298625&r1=1298624&r2=1298625&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/DirichletClusteringPolicy.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/DirichletClusteringPolicy.java
 Thu Mar  8 22:27:28 2012
@@ -19,18 +19,15 @@ package org.apache.mahout.clustering.ite
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.List;
 
-import org.apache.mahout.clustering.Cluster;
 import org.apache.mahout.clustering.classify.ClusterClassifier;
 import org.apache.mahout.clustering.dirichlet.UncommonDistributions;
 import org.apache.mahout.math.DenseVector;
 import org.apache.mahout.math.SequentialAccessSparseVector;
 import org.apache.mahout.math.Vector;
 import org.apache.mahout.math.VectorWritable;
-import org.apache.mahout.math.function.TimesFunction;
 
-public class DirichletClusteringPolicy implements ClusteringPolicy {
+public class DirichletClusteringPolicy extends AbstractClusteringPolicy {
   
   public DirichletClusteringPolicy() {
     super();
@@ -80,16 +77,6 @@ public class DirichletClusteringPolicy i
     mixture = UncommonDistributions.rDirichlet(totalCounts, alpha0);
   }
   
-  @Override
-  public Vector classify(Vector data, List<Cluster> models) {
-    int i = 0;
-    Vector pdfs = new DenseVector(models.size());
-    for (Cluster model : models) {
-      pdfs.set(i++, model.pdf(new VectorWritable(data)));
-    }
-    return pdfs.assign(new TimesFunction(), 1.0 / pdfs.zSum());
-  }
-
   /*
    * (non-Javadoc)
    * 

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/FuzzyKMeansClusteringPolicy.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/FuzzyKMeansClusteringPolicy.java?rev=1298625&r1=1298624&r2=1298625&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/FuzzyKMeansClusteringPolicy.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/FuzzyKMeansClusteringPolicy.java
 Thu Mar  8 22:27:28 2012
@@ -35,7 +35,7 @@ import com.google.common.collect.Lists;
  * clustering
  * 
  */
-public class FuzzyKMeansClusteringPolicy implements ClusteringPolicy {
+public class FuzzyKMeansClusteringPolicy extends AbstractClusteringPolicy {
   
   public FuzzyKMeansClusteringPolicy() {
     super();
@@ -53,18 +53,6 @@ public class FuzzyKMeansClusteringPolicy
    * (non-Javadoc)
    * 
    * @see
-   * org.apache.mahout.clustering.ClusteringPolicy#update(org.apache.mahout.
-   * clustering.ClusterClassifier)
-   */
-  @Override
-  public void update(ClusterClassifier posterior) {
-    // nothing to do here
-  }
-  
-  /*
-   * (non-Javadoc)
-   * 
-   * @see
    * org.apache.mahout.clustering.ClusteringPolicy#select(org.apache.mahout.
    * math.Vector)
    */
@@ -74,10 +62,10 @@ public class FuzzyKMeansClusteringPolicy
   }
   
   @Override
-  public Vector classify(Vector data, List<Cluster> models) {
+  public Vector classify(Vector data, ClusterClassifier prior) {
     Collection<SoftCluster> clusters = Lists.newArrayList();
     List<Double> distances = Lists.newArrayList();
-    for (Cluster model : models) {
+    for (Cluster model : prior.getModels()) {
       SoftCluster sc = (SoftCluster) model;
       clusters.add(sc);
       distances.add(sc.getMeasure().distance(data, sc.getCenter()));
@@ -86,7 +74,7 @@ public class FuzzyKMeansClusteringPolicy
     fuzzyKMeansClusterer.setM(m);
     return fuzzyKMeansClusterer.computePi(clusters, distances);
   }
-
+  
   /*
    * (non-Javadoc)
    * 
@@ -108,5 +96,14 @@ public class FuzzyKMeansClusteringPolicy
     this.m = in.readDouble();
     this.convergenceDelta = in.readDouble();
   }
+
+  @Override
+  public void close(ClusterClassifier posterior) {
+    for (Cluster cluster : posterior.getModels()) {
+      ((org.apache.mahout.clustering.kmeans.Kluster) 
cluster).calculateConvergence(convergenceDelta);
+      cluster.computeParameters();
+    }
+    
+  }
   
 }

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/KMeansClusteringPolicy.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/KMeansClusteringPolicy.java?rev=1298625&r1=1298624&r2=1298625&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/KMeansClusteringPolicy.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/KMeansClusteringPolicy.java
 Thu Mar  8 22:27:28 2012
@@ -19,72 +19,41 @@ package org.apache.mahout.clustering.ite
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.List;
 
 import org.apache.mahout.clustering.Cluster;
 import org.apache.mahout.clustering.classify.ClusterClassifier;
-import org.apache.mahout.math.DenseVector;
-import org.apache.mahout.math.SequentialAccessSparseVector;
-import org.apache.mahout.math.Vector;
-import org.apache.mahout.math.VectorWritable;
-import org.apache.mahout.math.function.TimesFunction;
 
 /**
  * This is a simple maximum likelihood clustering policy, suitable for k-means
  * clustering
  * 
  */
-public class KMeansClusteringPolicy implements ClusteringPolicy {
+public class KMeansClusteringPolicy extends AbstractClusteringPolicy {
   
   public KMeansClusteringPolicy() {
     super();
   }
-
+  
   public KMeansClusteringPolicy(double convergenceDelta) {
     super();
     this.convergenceDelta = convergenceDelta;
   }
-
-  private double convergenceDelta = 0.05;
   
-  /* (non-Javadoc)
-   * @see 
org.apache.mahout.clustering.ClusteringPolicy#select(org.apache.mahout.math.Vector)
-   */
-  @Override
-  public Vector select(Vector probabilities) {
-    int maxValueIndex = probabilities.maxValueIndex();
-    Vector weights = new SequentialAccessSparseVector(probabilities.size());
-    weights.set(maxValueIndex, 1.0);
-    return weights;
-  }
+  private double convergenceDelta = 0.001;
   
-  /* (non-Javadoc)
-   * @see 
org.apache.mahout.clustering.ClusteringPolicy#update(org.apache.mahout.clustering.ClusterClassifier)
-   */
-  @Override
-  public void update(ClusterClassifier posterior) {
-    // nothing to do here
-  }
-
-  @Override
-  public Vector classify(Vector data, List<Cluster> models) {
-    int i = 0;
-    Vector pdfs = new DenseVector(models.size());
-    for (Cluster model : models) {
-      pdfs.set(i++, model.pdf(new VectorWritable(data)));
-    }
-    return pdfs.assign(new TimesFunction(), 1.0 / pdfs.zSum());
-  }
-
-  /* (non-Javadoc)
+  /*
+   * (non-Javadoc)
+   * 
    * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
    */
   @Override
   public void write(DataOutput out) throws IOException {
     out.writeDouble(convergenceDelta);
   }
-
-  /* (non-Javadoc)
+  
+  /*
+   * (non-Javadoc)
+   * 
    * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
    */
   @Override
@@ -92,4 +61,16 @@ public class KMeansClusteringPolicy impl
     this.convergenceDelta = in.readDouble();
   }
   
+  @Override
+  public void close(ClusterClassifier posterior) {
+    boolean allConverged = true;
+    for (Cluster cluster : posterior.getModels()) {
+      org.apache.mahout.clustering.kmeans.Kluster kluster = 
(org.apache.mahout.clustering.kmeans.Kluster) cluster;
+      boolean converged = kluster.calculateConvergence(convergenceDelta);
+      allConverged = allConverged && converged;
+      cluster.computeParameters();
+    }
+    
+  }
+  
 }

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/MeanShiftClusteringPolicy.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/MeanShiftClusteringPolicy.java?rev=1298625&r1=1298624&r2=1298625&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/MeanShiftClusteringPolicy.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/iterator/MeanShiftClusteringPolicy.java
 Thu Mar  8 22:27:28 2012
@@ -19,22 +19,13 @@ package org.apache.mahout.clustering.ite
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.List;
-
-import org.apache.mahout.clustering.Cluster;
-import org.apache.mahout.clustering.classify.ClusterClassifier;
-import org.apache.mahout.math.DenseVector;
-import org.apache.mahout.math.SequentialAccessSparseVector;
-import org.apache.mahout.math.Vector;
-import org.apache.mahout.math.VectorWritable;
-import org.apache.mahout.math.function.TimesFunction;
 
 /**
  * This is a simple maximum likelihood clustering policy, suitable for k-means
  * clustering
  * 
  */
-public class MeanShiftClusteringPolicy implements ClusteringPolicy {
+public class MeanShiftClusteringPolicy extends AbstractClusteringPolicy {
   
   public MeanShiftClusteringPolicy() {
     super();
@@ -45,43 +36,6 @@ public class MeanShiftClusteringPolicy i
   /*
    * (non-Javadoc)
    * 
-   * @see
-   * org.apache.mahout.clustering.ClusteringPolicy#select(org.apache.mahout.
-   * math.Vector)
-   */
-  @Override
-  public Vector select(Vector probabilities) {
-    int maxValueIndex = probabilities.maxValueIndex();
-    Vector weights = new SequentialAccessSparseVector(probabilities.size());
-    weights.set(maxValueIndex, 1.0);
-    return weights;
-  }
-  
-  /*
-   * (non-Javadoc)
-   * 
-   * @see
-   * org.apache.mahout.clustering.ClusteringPolicy#update(org.apache.mahout.
-   * clustering.ClusterClassifier)
-   */
-  @Override
-  public void update(ClusterClassifier posterior) {
-    // nothing to do here
-  }
-  
-  @Override
-  public Vector classify(Vector data, List<Cluster> models) {
-    int i = 0;
-    Vector pdfs = new DenseVector(models.size());
-    for (Cluster model : models) {
-      pdfs.set(i++, model.pdf(new VectorWritable(data)));
-    }
-    return pdfs.assign(new TimesFunction(), 1.0 / pdfs.zSum());
-  }
-  
-  /*
-   * (non-Javadoc)
-   * 
    * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
    */
   @Override

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/Kluster.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/Kluster.java?rev=1298625&r1=1298624&r2=1298625&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/Kluster.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/Kluster.java
 Thu Mar  8 22:27:28 2012
@@ -25,28 +25,30 @@ import org.apache.mahout.common.distance
 import org.apache.mahout.math.Vector;
 
 public class Kluster extends DistanceMeasureCluster {
-
+  
   /** Has the centroid converged with the center? */
   private boolean converged;
-
+  
   /** For (de)serialization as a Writable */
-  public Kluster() {
-  }
-
+  public Kluster() {}
+  
   /**
    * Construct a new cluster with the given point as its center
-   *
-   * @param center  the Vector center
-   * @param clusterId the int cluster id
-   * @param measure a DistanceMeasure
+   * 
+   * @param center
+   *          the Vector center
+   * @param clusterId
+   *          the int cluster id
+   * @param measure
+   *          a DistanceMeasure
    */
   public Kluster(Vector center, int clusterId, DistanceMeasure measure) {
     super(center, clusterId, measure);
   }
-
+  
   /**
    * Format the cluster for output
-   *
+   * 
    * @param cluster
    *          the Cluster
    * @return the String representation of the Cluster
@@ -54,33 +56,33 @@ public class Kluster extends DistanceMea
   public static String formatCluster(Kluster cluster) {
     return cluster.getIdentifier() + ": " + 
cluster.computeCentroid().asFormatString();
   }
-
+  
   public String asFormatString() {
     return formatCluster(this);
   }
-
+  
   @Override
   public void write(DataOutput out) throws IOException {
     super.write(out);
     out.writeBoolean(converged);
   }
-
+  
   @Override
   public void readFields(DataInput in) throws IOException {
     super.readFields(in);
     this.converged = in.readBoolean();
   }
-
+  
   @Override
   public String toString() {
     return asFormatString(null);
   }
-
+  
   @Override
   public String getIdentifier() {
     return (converged ? "VL-" : "CL-") + getId();
   }
-
+  
   /**
    * Return if the cluster is converged by comparing its center and centroid.
    * 
@@ -95,14 +97,20 @@ public class Kluster extends DistanceMea
     converged = measure.distance(centroid.getLengthSquared(), centroid, 
getCenter()) <= convergenceDelta;
     return converged;
   }
-
+  
   @Override
   public boolean isConverged() {
     return converged;
   }
-
+  
   protected void setConverged(boolean converged) {
     this.converged = converged;
   }
-
+  
+  public boolean calculateConvergence(double convergenceDelta) {
+    Vector centroid = computeCentroid();
+    converged = getMeasure().distance(centroid.getLengthSquared(), centroid, 
getCenter()) <= convergenceDelta;
+    return converged;
+  }
+  
 }

Modified: 
mahout/trunk/core/src/test/java/org/apache/mahout/clustering/iterator/TestClusterClassifier.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/iterator/TestClusterClassifier.java?rev=1298625&r1=1298624&r2=1298625&view=diff
==============================================================================
--- 
mahout/trunk/core/src/test/java/org/apache/mahout/clustering/iterator/TestClusterClassifier.java
 (original)
+++ 
mahout/trunk/core/src/test/java/org/apache/mahout/clustering/iterator/TestClusterClassifier.java
 Thu Mar  8 22:27:28 2012
@@ -195,9 +195,8 @@ public final class TestClusterClassifier
   @Test
   public void testClusterIteratorKMeans() {
     List<Vector> data = 
TestKmeansClustering.getPoints(TestKmeansClustering.REFERENCE);
-    ClusteringPolicy policy = new KMeansClusteringPolicy();
     ClusterClassifier prior = newKlusterClassifier();
-    ClusterIterator iterator = new ClusterIterator(policy);
+    ClusterIterator iterator = new ClusterIterator();
     ClusterClassifier posterior = iterator.iterate(data, prior, 5);
     assertEquals(3, posterior.getModels().size());
     for (Cluster cluster : posterior.getModels()) {
@@ -208,9 +207,8 @@ public final class TestClusterClassifier
   @Test
   public void testClusterIteratorDirichlet() {
     List<Vector> data = 
TestKmeansClustering.getPoints(TestKmeansClustering.REFERENCE);
-    ClusteringPolicy policy = new DirichletClusteringPolicy(3, 1);
     ClusterClassifier prior = newKlusterClassifier();
-    ClusterIterator iterator = new ClusterIterator(policy);
+    ClusterIterator iterator = new ClusterIterator();
     ClusterClassifier posterior = iterator.iterate(data, prior, 5);
     assertEquals(3, posterior.getModels().size());
     for (Cluster cluster : posterior.getModels()) {
@@ -235,13 +233,13 @@ public final class TestClusterClassifier
     for (Cluster cluster : prior.getModels()) {
       System.out.println(cluster.asFormatString(null));
     }
-    ClusterIterator iterator = new ClusterIterator(prior.getPolicy());
-    iterator.iterateSeq(pointsPath, path, outPath, 5);
+    new ClusterIterator().iterateSeq(pointsPath, path, outPath, 5);
     
-    for (int i = 1; i <= 5; i++) {
+    for (int i = 1; i <= 4; i++) {
       System.out.println("Classifier-" + i);
       ClusterClassifier posterior = new ClusterClassifier();
-      posterior.readFromSeqFiles(new Path(outPath, "classifier-" + i));
+      String name = i == 4 ? "clusters-4-final" : "clusters-" + i;
+      posterior.readFromSeqFiles(new Path(outPath, name));
       assertEquals(3, posterior.getModels().size());
       for (Cluster cluster : posterior.getModels()) {
         System.out.println(cluster.asFormatString(null));
@@ -262,19 +260,20 @@ public final class TestClusterClassifier
     Path path = new Path(priorPath, "priorClassifier");
     ClusterClassifier prior = newKlusterClassifier();
     prior.writeToSeqFiles(path);
+    ClusteringPolicy policy = new KMeansClusteringPolicy();
+    ClusterClassifier.writePolicy(policy, path);
     assertEquals(3, prior.getModels().size());
     System.out.println("Prior");
     for (Cluster cluster : prior.getModels()) {
       System.out.println(cluster.asFormatString(null));
     }
-    ClusteringPolicy policy = new KMeansClusteringPolicy();
-    ClusterIterator iterator = new ClusterIterator(policy);
-    iterator.iterateMR(pointsPath, path, outPath, 3);
+    new ClusterIterator().iterateMR(pointsPath, path, outPath, 5);
     
-    for (int i = 1; i <= 3; i++) {
+    for (int i = 1; i <= 4; i++) {
       System.out.println("Classifier-" + i);
       ClusterClassifier posterior = new ClusterClassifier();
-      posterior.readFromSeqFiles(new Path(outPath, "clusters-" + i));
+      String name = i == 4 ? "clusters-4-final" : "clusters-" + i;
+      posterior.readFromSeqFiles(new Path(outPath, name));
       assertEquals(3, posterior.getModels().size());
       for (Cluster cluster : posterior.getModels()) {
         System.out.println(cluster.asFormatString(null));

Modified: 
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/ClustersFilter.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/ClustersFilter.java?rev=1298625&r1=1298624&r2=1298625&view=diff
==============================================================================
--- 
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/ClustersFilter.java
 (original)
+++ 
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/ClustersFilter.java
 Thu Mar  8 22:27:28 2012
@@ -19,14 +19,13 @@ package org.apache.mahout.clustering.dis
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
-import org.apache.mahout.clustering.Cluster;
 
 final class ClustersFilter implements PathFilter {
 
   @Override
   public boolean accept(Path path) {
     String pathString = path.toString();
-    return pathString.contains("/clusters-") && 
pathString.endsWith(Cluster.FINAL_ITERATION_SUFFIX);
+    return pathString.contains("/clusters-");
   }
 
 }

Modified: 
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayClustering.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayClustering.java?rev=1298625&r1=1298624&r2=1298625&view=diff
==============================================================================
--- 
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayClustering.java
 (original)
+++ 
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayClustering.java
 Thu Mar  8 22:27:28 2012
@@ -45,6 +45,7 @@ import org.apache.mahout.clustering.Abst
 import org.apache.mahout.clustering.Cluster;
 import org.apache.mahout.clustering.classify.WeightedVectorWritable;
 import org.apache.mahout.clustering.dirichlet.UncommonDistributions;
+import org.apache.mahout.clustering.iterator.ClusterWritable;
 import org.apache.mahout.common.Pair;
 import org.apache.mahout.common.RandomUtils;
 import org.apache.mahout.common.iterator.sequencefile.PathFilters;
@@ -342,6 +343,21 @@ public class DisplayClustering extends F
     return clusters;
   }
   
+  protected static List<Cluster> readClustersWritable(Path clustersIn) {
+    List<Cluster> clusters = Lists.newArrayList();
+    Configuration conf = new Configuration();
+    for (ClusterWritable value : new 
SequenceFileDirValueIterable<ClusterWritable>(clustersIn, PathType.LIST,
+        PathFilters.logsCRCFilter(), conf)) {
+      Cluster cluster = value.getValue();
+      log.info(
+          "Reading Cluster:{} center:{} numPoints:{} radius:{}",
+          new Object[] {cluster.getId(), 
AbstractCluster.formatVector(cluster.getCenter(), null),
+              cluster.getNumObservations(), 
AbstractCluster.formatVector(cluster.getRadius(), null)});
+      clusters.add(cluster);
+    }
+    return clusters;
+  }
+  
   protected static void loadClusters(Path output) throws IOException {
     Configuration conf = new Configuration();
     FileSystem fs = FileSystem.get(output.toUri(), conf);
@@ -351,6 +367,15 @@ public class DisplayClustering extends F
     }
   }
   
+  protected static void loadClustersWritable(Path output) throws IOException {
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(output.toUri(), conf);
+    for (FileStatus s : fs.listStatus(output, new ClustersFilter())) {
+      List<Cluster> clusters = readClustersWritable(s.getPath());
+      CLUSTERS.add(clusters);
+    }
+  }
+  
   protected static void loadClusters(Path output, PathFilter filter) throws 
IOException {
     Configuration conf = new Configuration();
     FileSystem fs = FileSystem.get(output.toUri(), conf);

Modified: 
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayDirichlet.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayDirichlet.java?rev=1298625&r1=1298624&r2=1298625&view=diff
==============================================================================
--- 
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayDirichlet.java
 (original)
+++ 
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayDirichlet.java
 Thu Mar  8 22:27:28 2012
@@ -96,11 +96,11 @@ public class DisplayDirichlet extends Di
     Path priorPath = new Path(output, "clusters-0");
     prior.writeToSeqFiles(priorPath);
     
-    ClusteringPolicy policy = new DirichletClusteringPolicy(numClusters, 
numIterations);
-    new ClusterIterator(policy).iterateSeq(samples, priorPath, output, 
numIterations);
+    new ClusterIterator().iterateSeq(samples, priorPath, output, 
numIterations);
     for (int i = 1; i <= numIterations; i++) {
       ClusterClassifier posterior = new ClusterClassifier();
-      posterior.readFromSeqFiles(new Path(output, "classifier-" + i));
+      String name = i == numIterations ? "clusters-" + i + "-final" : 
"clusters-" + i;
+      posterior.readFromSeqFiles(new Path(output, name));
       List<Cluster> clusters = Lists.newArrayList();
       for (Cluster cluster : posterior.getModels()) {
         if (isSignificant(cluster)) {

Modified: 
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayFuzzyKMeans.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayFuzzyKMeans.java?rev=1298625&r1=1298624&r2=1298625&view=diff
==============================================================================
--- 
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayFuzzyKMeans.java
 (original)
+++ 
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayFuzzyKMeans.java
 Thu Mar  8 22:27:28 2012
@@ -23,7 +23,6 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 
-import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.mahout.clustering.Cluster;
@@ -31,7 +30,6 @@ import org.apache.mahout.clustering.clas
 import org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver;
 import org.apache.mahout.clustering.fuzzykmeans.SoftCluster;
 import org.apache.mahout.clustering.iterator.ClusterIterator;
-import org.apache.mahout.clustering.iterator.ClusteringPolicy;
 import org.apache.mahout.clustering.iterator.FuzzyKMeansClusteringPolicy;
 import org.apache.mahout.clustering.kmeans.RandomSeedGenerator;
 import org.apache.mahout.common.HadoopUtil;
@@ -40,6 +38,8 @@ import org.apache.mahout.common.distance
 import org.apache.mahout.common.distance.ManhattanDistanceMeasure;
 import org.apache.mahout.math.Vector;
 
+import com.google.common.collect.Lists;
+
 public class DisplayFuzzyKMeans extends DisplayClustering {
   
   DisplayFuzzyKMeans() {
@@ -93,13 +93,8 @@ public class DisplayFuzzyKMeans extends 
     Path priorPath = new Path(output, "classifier-0");
     prior.writeToSeqFiles(priorPath);
     
-    ClusteringPolicy policy = new FuzzyKMeansClusteringPolicy(1.1, 0.001);
-    new ClusterIterator(policy).iterateSeq(samples, priorPath, output, 
maxIterations);
-    for (int i = 1; i <= maxIterations; i++) {
-      ClusterClassifier posterior = new ClusterClassifier();
-      posterior.readFromSeqFiles(new Path(output, "classifier-" + i));
-      CLUSTERS.add(posterior.getModels());
-    }
+    new ClusterIterator().iterateSeq(samples, priorPath, output, 
maxIterations);
+    loadClustersWritable(output);
   }
   
   private static void runSequentialFuzzyKClusterer(Configuration conf, Path 
samples, Path output,

Modified: 
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayKMeans.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayKMeans.java?rev=1298625&r1=1298624&r2=1298625&view=diff
==============================================================================
--- 
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayKMeans.java
 (original)
+++ 
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayKMeans.java
 Thu Mar  8 22:27:28 2012
@@ -23,13 +23,11 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 
-import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.mahout.clustering.Cluster;
 import org.apache.mahout.clustering.classify.ClusterClassifier;
 import org.apache.mahout.clustering.iterator.ClusterIterator;
-import org.apache.mahout.clustering.iterator.ClusteringPolicy;
 import org.apache.mahout.clustering.iterator.KMeansClusteringPolicy;
 import org.apache.mahout.clustering.kmeans.KMeansDriver;
 import org.apache.mahout.clustering.kmeans.RandomSeedGenerator;
@@ -39,6 +37,8 @@ import org.apache.mahout.common.distance
 import org.apache.mahout.common.distance.ManhattanDistanceMeasure;
 import org.apache.mahout.math.Vector;
 
+import com.google.common.collect.Lists;
+
 public class DisplayKMeans extends DisplayClustering {
   
   DisplayKMeans() {
@@ -85,13 +85,8 @@ public class DisplayKMeans extends Displ
     prior.writeToSeqFiles(priorPath);
     
     int maxIter = 10;
-    ClusteringPolicy policy = new KMeansClusteringPolicy();
-    new ClusterIterator(policy).iterateSeq(samples, priorPath, output, 
maxIter);
-    for (int i = 1; i <= maxIter; i++) {
-      ClusterClassifier posterior = new ClusterClassifier();
-      posterior.readFromSeqFiles(new Path(output, "classifier-" + i));
-      CLUSTERS.add(posterior.getModels());
-    }
+    new ClusterIterator().iterateSeq(samples, priorPath, output, maxIter);
+    loadClustersWritable(output);
   }
   
   private static void runSequentialKMeansClusterer(Configuration conf, Path 
samples, Path output,

Modified: 
mahout/trunk/examples/src/test/java/org/apache/mahout/clustering/display/ClustersFilterTest.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/examples/src/test/java/org/apache/mahout/clustering/display/ClustersFilterTest.java?rev=1298625&r1=1298624&r2=1298625&view=diff
==============================================================================
--- 
mahout/trunk/examples/src/test/java/org/apache/mahout/clustering/display/ClustersFilterTest.java
 (original)
+++ 
mahout/trunk/examples/src/test/java/org/apache/mahout/clustering/display/ClustersFilterTest.java
 Thu Mar  8 22:27:28 2012
@@ -49,8 +49,8 @@ public class ClustersFilterTest extends 
 
     PathFilter clustersFilter = new ClustersFilter();
 
-    assertFalse(clustersFilter.accept(path0));
-    assertFalse(clustersFilter.accept(path1));
+    assertTrue(clustersFilter.accept(path0));
+    assertTrue(clustersFilter.accept(path1));
   }
 
   @Test
@@ -67,9 +67,9 @@ public class ClustersFilterTest extends 
 
     PathFilter clustersFilter = new ClustersFilter();
 
-    assertFalse(clustersFilter.accept(path0));
-    assertFalse(clustersFilter.accept(path1));
-    assertFalse(clustersFilter.accept(path2));
+    assertTrue(clustersFilter.accept(path0));
+    assertTrue(clustersFilter.accept(path1));
+    assertTrue(clustersFilter.accept(path2));
     assertTrue(clustersFilter.accept(path3Final));
   }
 }


Reply via email to