http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java
----------------------------------------------------------------------
diff --git 
a/mr/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java
 
b/mr/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java
new file mode 100644
index 0000000..c6c8427
--- /dev/null
+++ 
b/mr/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java
@@ -0,0 +1,324 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.fuzzykmeans;
+
+import java.io.IOException;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.clustering.Cluster;
+import org.apache.mahout.clustering.classify.ClusterClassificationDriver;
+import org.apache.mahout.clustering.classify.ClusterClassifier;
+import org.apache.mahout.clustering.iterator.ClusterIterator;
+import org.apache.mahout.clustering.iterator.ClusteringPolicy;
+import org.apache.mahout.clustering.iterator.FuzzyKMeansClusteringPolicy;
+import org.apache.mahout.clustering.kmeans.RandomSeedGenerator;
+import org.apache.mahout.clustering.topdown.PathDirectory;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.ClassUtils;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.common.distance.SquaredEuclideanDistanceMeasure;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FuzzyKMeansDriver extends AbstractJob {
+
+  public static final String M_OPTION = "m";
+
+  private static final Logger log = 
LoggerFactory.getLogger(FuzzyKMeansDriver.class);
+
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new Configuration(), new FuzzyKMeansDriver(), args);
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+
+    addInputOption();
+    addOutputOption();
+    addOption(DefaultOptionCreator.distanceMeasureOption().create());
+    addOption(DefaultOptionCreator.clustersInOption()
+        .withDescription("The input centroids, as Vectors.  Must be a 
SequenceFile of Writable, Cluster/Canopy.  "
+            + "If k is also specified, then a random set of vectors will be 
selected"
+            + " and written out to this path first")
+        .create());
+    addOption(DefaultOptionCreator.numClustersOption()
+        .withDescription("The k in k-Means.  If specified, then a random 
selection of k Vectors will be chosen"
+            + " as the Centroid and written to the clusters input 
path.").create());
+    addOption(DefaultOptionCreator.convergenceOption().create());
+    addOption(DefaultOptionCreator.maxIterationsOption().create());
+    addOption(DefaultOptionCreator.overwriteOption().create());
+    addOption(M_OPTION, M_OPTION, "coefficient normalization factor, must be 
greater than 1", true);
+    addOption(DefaultOptionCreator.clusteringOption().create());
+    addOption(DefaultOptionCreator.emitMostLikelyOption().create());
+    addOption(DefaultOptionCreator.thresholdOption().create());
+    addOption(DefaultOptionCreator.methodOption().create());
+    addOption(DefaultOptionCreator.useSetRandomSeedOption().create());
+
+    if (parseArguments(args) == null) {
+      return -1;
+    }
+
+    Path input = getInputPath();
+    Path clusters = new 
Path(getOption(DefaultOptionCreator.CLUSTERS_IN_OPTION));
+    Path output = getOutputPath();
+    String measureClass = 
getOption(DefaultOptionCreator.DISTANCE_MEASURE_OPTION);
+    if (measureClass == null) {
+      measureClass = SquaredEuclideanDistanceMeasure.class.getName();
+    }
+    double convergenceDelta = 
Double.parseDouble(getOption(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION));
+    float fuzziness = Float.parseFloat(getOption(M_OPTION));
+
+    int maxIterations = 
Integer.parseInt(getOption(DefaultOptionCreator.MAX_ITERATIONS_OPTION));
+    if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
+      HadoopUtil.delete(getConf(), output);
+    }
+    boolean emitMostLikely = 
Boolean.parseBoolean(getOption(DefaultOptionCreator.EMIT_MOST_LIKELY_OPTION));
+    double threshold = 
Double.parseDouble(getOption(DefaultOptionCreator.THRESHOLD_OPTION));
+    DistanceMeasure measure = ClassUtils.instantiateAs(measureClass, 
DistanceMeasure.class);
+
+    if (hasOption(DefaultOptionCreator.NUM_CLUSTERS_OPTION)) {
+      int numClusters = 
Integer.parseInt(getOption(DefaultOptionCreator.NUM_CLUSTERS_OPTION));
+
+      Long seed = null;
+      if (hasOption(DefaultOptionCreator.RANDOM_SEED)) {
+        seed = Long.parseLong(getOption(DefaultOptionCreator.RANDOM_SEED));
+      }
+
+      clusters = RandomSeedGenerator.buildRandom(getConf(), input, clusters, 
numClusters, measure, seed);
+    }
+
+    boolean runClustering = hasOption(DefaultOptionCreator.CLUSTERING_OPTION);
+    boolean runSequential = 
getOption(DefaultOptionCreator.METHOD_OPTION).equalsIgnoreCase(
+        DefaultOptionCreator.SEQUENTIAL_METHOD);
+
+    run(getConf(),
+        input,
+        clusters,
+        output,
+        convergenceDelta,
+        maxIterations,
+        fuzziness,
+        runClustering,
+        emitMostLikely,
+        threshold,
+        runSequential);
+    return 0;
+  }
+
+  /**
+   * Iterate over the input vectors to produce clusters and, if requested, use 
the
+   * results of the final iteration to cluster the input vectors.
+   *
+   * @param input
+   *          the directory pathname for input points
+   * @param clustersIn
+   *          the directory pathname for initial & computed clusters
+   * @param output
+ *          the directory pathname for output points
+   * @param convergenceDelta
+*          the convergence delta value
+   * @param maxIterations
+*          the maximum number of iterations
+   * @param m
+*          the fuzzification factor, see
+*          
http://en.wikipedia.org/wiki/Data_clustering#Fuzzy_c-means_clustering
+   * @param runClustering
+*          true if points are to be clustered after iterations complete
+   * @param emitMostLikely
+*          a boolean if true emit only most likely cluster for each point
+   * @param threshold
+*          a double threshold value emits all clusters having greater pdf 
(emitMostLikely = false)
+   * @param runSequential if true run in sequential execution mode
+   */
+  public static void run(Path input,
+                         Path clustersIn,
+                         Path output,
+                         double convergenceDelta,
+                         int maxIterations,
+                         float m,
+                         boolean runClustering,
+                         boolean emitMostLikely,
+                         double threshold,
+                         boolean runSequential) throws IOException, 
ClassNotFoundException, InterruptedException {
+    Configuration conf = new Configuration();
+    Path clustersOut = buildClusters(conf,
+                                     input,
+                                     clustersIn,
+                                     output,
+                                     convergenceDelta,
+                                     maxIterations,
+                                     m,
+                                     runSequential);
+    if (runClustering) {
+      log.info("Clustering ");
+      clusterData(conf, input,
+                  clustersOut,
+                  output,
+                  convergenceDelta,
+                  m,
+                  emitMostLikely,
+                  threshold,
+                  runSequential);
+    }
+  }
+
+  /**
+   * Iterate over the input vectors to produce clusters and, if requested, use 
the
+   * results of the final iteration to cluster the input vectors.
+   * @param input
+   *          the directory pathname for input points
+   * @param clustersIn
+   *          the directory pathname for initial & computed clusters
+   * @param output
+ *          the directory pathname for output points
+   * @param convergenceDelta
+*          the convergence delta value
+   * @param maxIterations
+*          the maximum number of iterations
+   * @param m
+*          the fuzzification factor, see
+*          
http://en.wikipedia.org/wiki/Data_clustering#Fuzzy_c-means_clustering
+   * @param runClustering
+*          true if points are to be clustered after iterations complete
+   * @param emitMostLikely
+*          a boolean if true emit only most likely cluster for each point
+   * @param threshold
+*          a double threshold value emits all clusters having greater pdf 
(emitMostLikely = false)
+   * @param runSequential if true run in sequential execution mode
+   */
+  public static void run(Configuration conf,
+                         Path input,
+                         Path clustersIn,
+                         Path output,
+                         double convergenceDelta,
+                         int maxIterations,
+                         float m,
+                         boolean runClustering,
+                         boolean emitMostLikely,
+                         double threshold,
+                         boolean runSequential)
+    throws IOException, ClassNotFoundException, InterruptedException {
+    Path clustersOut =
+        buildClusters(conf, input, clustersIn, output, convergenceDelta, 
maxIterations, m, runSequential);
+    if (runClustering) {
+      log.info("Clustering");
+      clusterData(conf, 
+                  input,
+                  clustersOut,
+                  output,
+                  convergenceDelta,
+                  m,
+                  emitMostLikely,
+                  threshold,
+                  runSequential);
+    }
+  }
+
+  /**
+   * Iterate over the input vectors to produce cluster directories for each 
iteration
+   *
+   * @param input
+   *          the directory pathname for input points
+   * @param clustersIn
+   *          the file pathname for initial cluster centers
+   * @param output
+   *          the directory pathname for output points
+   * @param convergenceDelta
+   *          the convergence delta value
+   * @param maxIterations
+   *          the maximum number of iterations
+   * @param m
+   *          the fuzzification factor, see
+   *          
http://en.wikipedia.org/wiki/Data_clustering#Fuzzy_c-means_clustering
+   * @param runSequential if true run in sequential execution mode
+   *
+   * @return the Path of the final clusters directory
+   */
+  public static Path buildClusters(Configuration conf,
+                                   Path input,
+                                   Path clustersIn,
+                                   Path output,
+                                   double convergenceDelta,
+                                   int maxIterations,
+                                   float m,
+                                   boolean runSequential)
+    throws IOException, InterruptedException, ClassNotFoundException {
+    
+    List<Cluster> clusters = Lists.newArrayList();
+    FuzzyKMeansUtil.configureWithClusterInfo(conf, clustersIn, clusters);
+    
+    if (conf == null) {
+      conf = new Configuration();
+    }
+    
+    if (clusters.isEmpty()) {
+      throw new IllegalStateException("No input clusters found in " + 
clustersIn + ". Check your -c argument.");
+    }
+    
+    Path priorClustersPath = new Path(output, Cluster.INITIAL_CLUSTERS_DIR);   
+    ClusteringPolicy policy = new FuzzyKMeansClusteringPolicy(m, 
convergenceDelta);
+    ClusterClassifier prior = new ClusterClassifier(clusters, policy);
+    prior.writeToSeqFiles(priorClustersPath);
+    
+    if (runSequential) {
+      ClusterIterator.iterateSeq(conf, input, priorClustersPath, output, 
maxIterations);
+    } else {
+      ClusterIterator.iterateMR(conf, input, priorClustersPath, output, 
maxIterations);
+    }
+    return output;
+  }
+
+  /**
+   * Run the job using supplied arguments
+   *
+   * @param input
+   *          the directory pathname for input points
+   * @param clustersIn
+   *          the directory pathname for input clusters
+   * @param output
+ *          the directory pathname for output points
+   * @param convergenceDelta
+*          the convergence delta value
+   * @param emitMostLikely
+*          a boolean if true emit only most likely cluster for each point
+   * @param threshold
+*          a double threshold value emits all clusters having greater pdf 
(emitMostLikely = false)
+   * @param runSequential if true run in sequential execution mode
+   */
+  public static void clusterData(Configuration conf,
+                                 Path input,
+                                 Path clustersIn,
+                                 Path output,
+                                 double convergenceDelta,
+                                 float m,
+                                 boolean emitMostLikely,
+                                 double threshold,
+                                 boolean runSequential)
+    throws IOException, ClassNotFoundException, InterruptedException {
+    
+    ClusterClassifier.writePolicy(new FuzzyKMeansClusteringPolicy(m, 
convergenceDelta), clustersIn);
+    ClusterClassificationDriver.run(conf, input, output, new Path(output, 
PathDirectory.CLUSTERED_POINTS_DIRECTORY),
+        threshold, emitMostLikely, runSequential);
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansUtil.java
----------------------------------------------------------------------
diff --git 
a/mr/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansUtil.java
 
b/mr/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansUtil.java
new file mode 100644
index 0000000..25621bb
--- /dev/null
+++ 
b/mr/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansUtil.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.fuzzykmeans;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.clustering.Cluster;
+import org.apache.mahout.clustering.canopy.Canopy;
+import org.apache.mahout.clustering.iterator.ClusterWritable;
+import org.apache.mahout.clustering.kmeans.Kluster;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
+import 
org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterable;
+
+final class FuzzyKMeansUtil {
+  
+  private FuzzyKMeansUtil() {}
+  
+  /**
+   * Create a list of SoftClusters from whatever type is passed in as the prior
+   * 
+   * @param conf
+   *          the Configuration
+   * @param clusterPath
+   *          the path to the prior Clusters
+   * @param clusters
+   *          a List<Cluster> to put values into
+   */
+  public static void configureWithClusterInfo(Configuration conf, Path 
clusterPath, List<Cluster> clusters) {
+    for (Writable value : new SequenceFileDirValueIterable<>(clusterPath, 
PathType.LIST,
+        PathFilters.partFilter(), conf)) {
+      Class<? extends Writable> valueClass = value.getClass();
+      
+      if (valueClass.equals(ClusterWritable.class)) {
+        ClusterWritable clusterWritable = (ClusterWritable) value;
+        value = clusterWritable.getValue();
+        valueClass = value.getClass();
+      }
+      
+      if (valueClass.equals(Kluster.class)) {
+        // get the cluster info
+        Kluster cluster = (Kluster) value;
+        clusters.add(new SoftCluster(cluster.getCenter(), cluster.getId(), 
cluster.getMeasure()));
+      } else if (valueClass.equals(SoftCluster.class)) {
+        // get the cluster info
+        clusters.add((SoftCluster) value);
+      } else if (valueClass.equals(Canopy.class)) {
+        // get the cluster info
+        Canopy canopy = (Canopy) value;
+        clusters.add(new SoftCluster(canopy.getCenter(), canopy.getId(), 
canopy.getMeasure()));
+      } else {
+        throw new IllegalStateException("Bad value class: " + valueClass);
+      }
+    }
+    
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/fuzzykmeans/SoftCluster.java
----------------------------------------------------------------------
diff --git 
a/mr/src/main/java/org/apache/mahout/clustering/fuzzykmeans/SoftCluster.java 
b/mr/src/main/java/org/apache/mahout/clustering/fuzzykmeans/SoftCluster.java
new file mode 100644
index 0000000..52fd764
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/clustering/fuzzykmeans/SoftCluster.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.fuzzykmeans;
+
+import org.apache.mahout.clustering.kmeans.Kluster;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+public class SoftCluster extends Kluster {
+  
+  // For Writable
+  public SoftCluster() {}
+  
+  /**
+   * Construct a new SoftCluster with the given point as its center
+   * 
+   * @param center
+   *          the center point
+   * @param measure
+   *          the DistanceMeasure
+   */
+  public SoftCluster(Vector center, int clusterId, DistanceMeasure measure) {
+    super(center, clusterId, measure);
+  }
+  
+  @Override
+  public String asFormatString() {
+    return this.getIdentifier() + ": "
+        + this.computeCentroid().asFormatString();
+  }
+  
+  @Override
+  public String getIdentifier() {
+    return (isConverged() ? "SV-" : "SC-") + getId();
+  }
+  
+  @Override
+  public double pdf(VectorWritable vw) {
+    // SoftCluster pdf cannot be calculated out of context. See
+    // FuzzyKMeansClusterer
+    throw new UnsupportedOperationException(
+        "SoftCluster pdf cannot be calculated out of context. See 
FuzzyKMeansClusterer");
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/iterator/AbstractClusteringPolicy.java
----------------------------------------------------------------------
diff --git 
a/mr/src/main/java/org/apache/mahout/clustering/iterator/AbstractClusteringPolicy.java
 
b/mr/src/main/java/org/apache/mahout/clustering/iterator/AbstractClusteringPolicy.java
new file mode 100644
index 0000000..07cc7e3
--- /dev/null
+++ 
b/mr/src/main/java/org/apache/mahout/clustering/iterator/AbstractClusteringPolicy.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.iterator;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.mahout.clustering.Cluster;
+import org.apache.mahout.clustering.classify.ClusterClassifier;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.SequentialAccessSparseVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.function.TimesFunction;
+
+public abstract class AbstractClusteringPolicy implements ClusteringPolicy {
+  
+  @Override
+  public abstract void write(DataOutput out) throws IOException;
+  
+  @Override
+  public abstract void readFields(DataInput in) throws IOException;
+  
+  @Override
+  public Vector select(Vector probabilities) {
+    int maxValueIndex = probabilities.maxValueIndex();
+    Vector weights = new SequentialAccessSparseVector(probabilities.size());
+    weights.set(maxValueIndex, 1.0);
+    return weights;
+  }
+  
+  @Override
+  public void update(ClusterClassifier posterior) {
+    // nothing to do in general here
+  }
+  
+  @Override
+  public Vector classify(Vector data, ClusterClassifier prior) {
+    List<Cluster> models = prior.getModels();
+    int i = 0;
+    Vector pdfs = new DenseVector(models.size());
+    for (Cluster model : models) {
+      pdfs.set(i++, model.pdf(new VectorWritable(data)));
+    }
+    return pdfs.assign(new TimesFunction(), 1.0 / pdfs.zSum());
+  }
+  
+  @Override
+  public void close(ClusterClassifier posterior) {
+    for (Cluster cluster : posterior.getModels()) {
+      cluster.computeParameters();
+    }
+    
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/iterator/CIMapper.java
----------------------------------------------------------------------
diff --git 
a/mr/src/main/java/org/apache/mahout/clustering/iterator/CIMapper.java 
b/mr/src/main/java/org/apache/mahout/clustering/iterator/CIMapper.java
new file mode 100644
index 0000000..fb2db49
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/clustering/iterator/CIMapper.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.iterator;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.clustering.Cluster;
+import org.apache.mahout.clustering.classify.ClusterClassifier;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.Vector.Element;
+import org.apache.mahout.math.VectorWritable;
+
+public class CIMapper extends 
Mapper<WritableComparable<?>,VectorWritable,IntWritable,ClusterWritable> {
+  
+  private ClusterClassifier classifier;
+  private ClusteringPolicy policy;
+
+  @Override
+  protected void setup(Context context) throws IOException, 
InterruptedException {
+    Configuration conf = context.getConfiguration();
+    String priorClustersPath = conf.get(ClusterIterator.PRIOR_PATH_KEY);
+    classifier = new ClusterClassifier();
+    classifier.readFromSeqFiles(conf, new Path(priorClustersPath));
+    policy = classifier.getPolicy();
+    policy.update(classifier);
+    super.setup(context);
+  }
+
+  @Override
+  protected void map(WritableComparable<?> key, VectorWritable value, Context 
context) throws IOException,
+      InterruptedException {
+    Vector probabilities = classifier.classify(value.get());
+    Vector selections = policy.select(probabilities);
+    for (Element el : selections.nonZeroes()) {
+      classifier.train(el.index(), value.get(), el.get());
+    }
+  }
+
+  @Override
+  protected void cleanup(Context context) throws IOException, 
InterruptedException {
+    List<Cluster> clusters = classifier.getModels();
+    ClusterWritable cw = new ClusterWritable();
+    for (int index = 0; index < clusters.size(); index++) {
+      cw.setValue(clusters.get(index));
+      context.write(new IntWritable(index), cw);
+    }
+    super.cleanup(context);
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/iterator/CIReducer.java
----------------------------------------------------------------------
diff --git 
a/mr/src/main/java/org/apache/mahout/clustering/iterator/CIReducer.java 
b/mr/src/main/java/org/apache/mahout/clustering/iterator/CIReducer.java
new file mode 100644
index 0000000..bf42eb1
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/clustering/iterator/CIReducer.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.iterator;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.clustering.Cluster;
+import org.apache.mahout.clustering.classify.ClusterClassifier;
+
+public class CIReducer extends 
Reducer<IntWritable,ClusterWritable,IntWritable,ClusterWritable> {
+  
+  private ClusterClassifier classifier;
+  private ClusteringPolicy policy;
+  
+  @Override
+  protected void reduce(IntWritable key, Iterable<ClusterWritable> values, 
Context context) throws IOException,
+      InterruptedException {
+    Iterator<ClusterWritable> iter = values.iterator();
+    Cluster first = iter.next().getValue(); // there must always be at least 
one
+    while (iter.hasNext()) {
+      Cluster cluster = iter.next().getValue();
+      first.observe(cluster);
+    }
+    List<Cluster> models = Lists.newArrayList();
+    models.add(first);
+    classifier = new ClusterClassifier(models, policy);
+    classifier.close();
+    context.write(key, new ClusterWritable(first));
+  }
+
+  @Override
+  protected void setup(Context context) throws IOException, 
InterruptedException {
+    Configuration conf = context.getConfiguration();
+    String priorClustersPath = conf.get(ClusterIterator.PRIOR_PATH_KEY);
+    classifier = new ClusterClassifier();
+    classifier.readFromSeqFiles(conf, new Path(priorClustersPath));
+    policy = classifier.getPolicy();
+    policy.update(classifier);
+    super.setup(context);
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/iterator/CanopyClusteringPolicy.java
----------------------------------------------------------------------
diff --git 
a/mr/src/main/java/org/apache/mahout/clustering/iterator/CanopyClusteringPolicy.java
 
b/mr/src/main/java/org/apache/mahout/clustering/iterator/CanopyClusteringPolicy.java
new file mode 100644
index 0000000..c9a0940
--- /dev/null
+++ 
b/mr/src/main/java/org/apache/mahout/clustering/iterator/CanopyClusteringPolicy.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.iterator;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.mahout.math.SequentialAccessSparseVector;
+import org.apache.mahout.math.Vector;
+
+@Deprecated
+public class CanopyClusteringPolicy extends AbstractClusteringPolicy {
+
+  private double t1;
+  private double t2;
+
+  @Override
+  public Vector select(Vector probabilities) {
+    int maxValueIndex = probabilities.maxValueIndex();
+    Vector weights = new SequentialAccessSparseVector(probabilities.size());
+    weights.set(maxValueIndex, 1.0);
+    return weights;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeDouble(t1);
+    out.writeDouble(t2);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    this.t1 = in.readDouble();
+    this.t2 = in.readDouble();
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/iterator/ClusterIterator.java
----------------------------------------------------------------------
diff --git 
a/mr/src/main/java/org/apache/mahout/clustering/iterator/ClusterIterator.java 
b/mr/src/main/java/org/apache/mahout/clustering/iterator/ClusterIterator.java
new file mode 100644
index 0000000..516177f
--- /dev/null
+++ 
b/mr/src/main/java/org/apache/mahout/clustering/iterator/ClusterIterator.java
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.iterator;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.mahout.clustering.Cluster;
+import org.apache.mahout.clustering.classify.ClusterClassifier;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
+import 
org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterable;
+import 
org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterator;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+import com.google.common.io.Closeables;
+
+/**
+ * This is a clustering iterator which works with a set of Vector data and a 
prior ClusterClassifier which has been
+ * initialized with a set of models. Its implementation is algorithm-neutral 
and works for any iterative clustering
+ * algorithm (currently k-means and fuzzy-k-means) that processes all the 
input vectors in each iteration.
+ * The cluster classifier is configured with a ClusteringPolicy to select the 
desired clustering algorithm.
+ */
+public final class ClusterIterator {
+  
+  public static final String PRIOR_PATH_KEY = 
"org.apache.mahout.clustering.prior.path";
+
+  private ClusterIterator() {
+  }
+  
+  /**
+   * Iterate over data using a prior-trained ClusterClassifier, for a number 
of iterations
+   *
+   * @param data
+   *          a {@code List<Vector>} of input vectors
+   * @param classifier
+   *          a prior ClusterClassifier
+   * @param numIterations
+   *          the int number of iterations to perform
+   * 
+   * @return the posterior ClusterClassifier
+   */
+  public static ClusterClassifier iterate(Iterable<Vector> data, 
ClusterClassifier classifier, int numIterations) {
+    ClusteringPolicy policy = classifier.getPolicy();
+    for (int iteration = 1; iteration <= numIterations; iteration++) {
+      for (Vector vector : data) {
+        // update the policy based upon the prior
+        policy.update(classifier);
+        // classification yields probabilities
+        Vector probabilities = classifier.classify(vector);
+        // policy selects weights for models given those probabilities
+        Vector weights = policy.select(probabilities);
+        // training causes all models to observe data
+        for (Vector.Element e : weights.nonZeroes()) {
+          int index = e.index();
+          classifier.train(index, vector, weights.get(index));
+        }
+      }
+      // compute the posterior models
+      classifier.close();
+    }
+    return classifier;
+  }
+  
+  /**
+   * Iterate over data using a prior-trained ClusterClassifier, for a number 
of iterations using a sequential
+   * implementation
+   * 
+   * @param conf
+   *          the Configuration
+   * @param inPath
+   *          a Path to input VectorWritables
+   * @param priorPath
+   *          a Path to the prior classifier
+   * @param outPath
+   *          a Path of output directory
+   * @param numIterations
+   *          the int number of iterations to perform
+   */
+  public static void iterateSeq(Configuration conf, Path inPath, Path 
priorPath, Path outPath, int numIterations)
+    throws IOException {
+    ClusterClassifier classifier = new ClusterClassifier();
+    classifier.readFromSeqFiles(conf, priorPath);
+    Path clustersOut = null;
+    int iteration = 1;
+    while (iteration <= numIterations) {
+      for (VectorWritable vw : new 
SequenceFileDirValueIterable<VectorWritable>(inPath, PathType.LIST,
+          PathFilters.logsCRCFilter(), conf)) {
+        Vector vector = vw.get();
+        // classification yields probabilities
+        Vector probabilities = classifier.classify(vector);
+        // policy selects weights for models given those probabilities
+        Vector weights = classifier.getPolicy().select(probabilities);
+        // training causes all models to observe data
+        for (Vector.Element e : weights.nonZeroes()) {
+          int index = e.index();
+          classifier.train(index, vector, weights.get(index));
+        }
+      }
+      // compute the posterior models
+      classifier.close();
+      // update the policy
+      classifier.getPolicy().update(classifier);
+      // output the classifier
+      clustersOut = new Path(outPath, Cluster.CLUSTERS_DIR + iteration);
+      classifier.writeToSeqFiles(clustersOut);
+      FileSystem fs = FileSystem.get(outPath.toUri(), conf);
+      iteration++;
+      if (isConverged(clustersOut, conf, fs)) {
+        break;
+      }
+    }
+    Path finalClustersIn = new Path(outPath, Cluster.CLUSTERS_DIR + (iteration 
- 1) + Cluster.FINAL_ITERATION_SUFFIX);
+    FileSystem.get(clustersOut.toUri(), conf).rename(clustersOut, 
finalClustersIn);
+  }
+  
+  /**
+   * Iterate over data using a prior-trained ClusterClassifier, for a number 
of iterations using a mapreduce
+   * implementation
+   * 
+   * @param conf
+   *          the Configuration
+   * @param inPath
+   *          a Path to input VectorWritables
+   * @param priorPath
+   *          a Path to the prior classifier
+   * @param outPath
+   *          a Path of output directory
+   * @param numIterations
+   *          the int number of iterations to perform
+   */
+  public static void iterateMR(Configuration conf, Path inPath, Path 
priorPath, Path outPath, int numIterations)
+    throws IOException, InterruptedException, ClassNotFoundException {
+    ClusteringPolicy policy = ClusterClassifier.readPolicy(priorPath);
+    Path clustersOut = null;
+    int iteration = 1;
+    while (iteration <= numIterations) {
+      conf.set(PRIOR_PATH_KEY, priorPath.toString());
+      
+      String jobName = "Cluster Iterator running iteration " + iteration + " 
over priorPath: " + priorPath;
+      Job job = new Job(conf, jobName);
+      job.setMapOutputKeyClass(IntWritable.class);
+      job.setMapOutputValueClass(ClusterWritable.class);
+      job.setOutputKeyClass(IntWritable.class);
+      job.setOutputValueClass(ClusterWritable.class);
+      
+      job.setInputFormatClass(SequenceFileInputFormat.class);
+      job.setOutputFormatClass(SequenceFileOutputFormat.class);
+      job.setMapperClass(CIMapper.class);
+      job.setReducerClass(CIReducer.class);
+      
+      FileInputFormat.addInputPath(job, inPath);
+      clustersOut = new Path(outPath, Cluster.CLUSTERS_DIR + iteration);
+      priorPath = clustersOut;
+      FileOutputFormat.setOutputPath(job, clustersOut);
+      
+      job.setJarByClass(ClusterIterator.class);
+      if (!job.waitForCompletion(true)) {
+        throw new InterruptedException("Cluster Iteration " + iteration + " 
failed processing " + priorPath);
+      }
+      ClusterClassifier.writePolicy(policy, clustersOut);
+      FileSystem fs = FileSystem.get(outPath.toUri(), conf);
+      iteration++;
+      if (isConverged(clustersOut, conf, fs)) {
+        break;
+      }
+    }
+    Path finalClustersIn = new Path(outPath, Cluster.CLUSTERS_DIR + (iteration 
- 1) + Cluster.FINAL_ITERATION_SUFFIX);
+    FileSystem.get(clustersOut.toUri(), conf).rename(clustersOut, 
finalClustersIn);
+  }
+  
+  /**
+   * Return if all of the Clusters in the parts in the filePath have converged 
or not
+   * 
+   * @param filePath
+   *          the file path to the single file containing the clusters
+   * @return true if all Clusters are converged
+   * @throws IOException
+   *           if there was an IO error
+   */
+  private static boolean isConverged(Path filePath, Configuration conf, 
FileSystem fs) throws IOException {
+    for (FileStatus part : fs.listStatus(filePath, PathFilters.partFilter())) {
+      SequenceFileValueIterator<ClusterWritable> iterator = new 
SequenceFileValueIterator<>(
+          part.getPath(), true, conf);
+      while (iterator.hasNext()) {
+        ClusterWritable value = iterator.next();
+        if (!value.getValue().isConverged()) {
+          Closeables.close(iterator, true);
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/iterator/ClusterWritable.java
----------------------------------------------------------------------
diff --git 
a/mr/src/main/java/org/apache/mahout/clustering/iterator/ClusterWritable.java 
b/mr/src/main/java/org/apache/mahout/clustering/iterator/ClusterWritable.java
new file mode 100644
index 0000000..855685f
--- /dev/null
+++ 
b/mr/src/main/java/org/apache/mahout/clustering/iterator/ClusterWritable.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.iterator;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.classifier.sgd.PolymorphicWritable;
+import org.apache.mahout.clustering.Cluster;
+
+public class ClusterWritable implements Writable {
+  
+  private Cluster value;
+  
+  public ClusterWritable(Cluster first) {
+    value = first;
+  }
+
+  public ClusterWritable() {
+  }
+
+  public Cluster getValue() {
+    return value;
+  }
+  
+  public void setValue(Cluster value) {
+    this.value = value;
+  }
+  
+  @Override
+  public void write(DataOutput out) throws IOException {
+    PolymorphicWritable.write(out, value);
+  }
+  
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    value = PolymorphicWritable.read(in, Cluster.class);
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/iterator/ClusteringPolicy.java
----------------------------------------------------------------------
diff --git 
a/mr/src/main/java/org/apache/mahout/clustering/iterator/ClusteringPolicy.java 
b/mr/src/main/java/org/apache/mahout/clustering/iterator/ClusteringPolicy.java
new file mode 100644
index 0000000..6e15838
--- /dev/null
+++ 
b/mr/src/main/java/org/apache/mahout/clustering/iterator/ClusteringPolicy.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.iterator;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.clustering.classify.ClusterClassifier;
+import org.apache.mahout.math.Vector;
+
+/**
+ * A ClusteringPolicy captures the semantics of assignment of points to 
clusters
+ * 
+ */
+public interface ClusteringPolicy extends Writable {
+  
+  /**
+   * Classify the data vector given the classifier's models
+   * 
+   * @param data
+   *          a data Vector
+   * @param prior
+   *          a prior ClusterClassifier
+   * @return a Vector of probabilities that the data is described by each of 
the
+   *         models
+   */
+  Vector classify(Vector data, ClusterClassifier prior);
+  
+  /**
+   * Return a vector of weights for each of the models given those 
probabilities
+   * 
+   * @param probabilities
+   *          a Vector of pdfs
+   * @return a Vector of weights
+   */
+  Vector select(Vector probabilities);
+  
+  /**
+   * Update the policy with the given classifier
+   * 
+   * @param posterior
+   *          a ClusterClassifier
+   */
+  void update(ClusterClassifier posterior);
+  
+  /**
+   * Close the policy using the classifier's models
+   * 
+   * @param posterior
+   *          a posterior ClusterClassifier
+   */
+  void close(ClusterClassifier posterior);
+  
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/iterator/ClusteringPolicyWritable.java
----------------------------------------------------------------------
diff --git 
a/mr/src/main/java/org/apache/mahout/clustering/iterator/ClusteringPolicyWritable.java
 
b/mr/src/main/java/org/apache/mahout/clustering/iterator/ClusteringPolicyWritable.java
new file mode 100644
index 0000000..f69442d
--- /dev/null
+++ 
b/mr/src/main/java/org/apache/mahout/clustering/iterator/ClusteringPolicyWritable.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.iterator;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.classifier.sgd.PolymorphicWritable;
+
+public class ClusteringPolicyWritable implements Writable {
+  
+  private ClusteringPolicy value;
+  
+  public ClusteringPolicyWritable(ClusteringPolicy policy) {
+    this.value = policy;
+  }
+
+  public ClusteringPolicyWritable() {
+  }
+
+  public ClusteringPolicy getValue() {
+    return value;
+  }
+  
+  public void setValue(ClusteringPolicy value) {
+    this.value = value;
+  }
+  
+  @Override
+  public void write(DataOutput out) throws IOException {
+    PolymorphicWritable.write(out, value);
+  }
+  
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    value = PolymorphicWritable.read(in, ClusteringPolicy.class);
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/iterator/DistanceMeasureCluster.java
----------------------------------------------------------------------
diff --git 
a/mr/src/main/java/org/apache/mahout/clustering/iterator/DistanceMeasureCluster.java
 
b/mr/src/main/java/org/apache/mahout/clustering/iterator/DistanceMeasureCluster.java
new file mode 100644
index 0000000..f61aa27
--- /dev/null
+++ 
b/mr/src/main/java/org/apache/mahout/clustering/iterator/DistanceMeasureCluster.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.iterator;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.mahout.clustering.AbstractCluster;
+import org.apache.mahout.clustering.Model;
+import org.apache.mahout.common.ClassUtils;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+public class DistanceMeasureCluster extends AbstractCluster {
+
+  private DistanceMeasure measure;
+
+  public DistanceMeasureCluster(Vector point, int id, DistanceMeasure measure) 
{
+    super(point, id);
+    this.measure = measure;
+  }
+
+  public DistanceMeasureCluster() {
+  }
+
+  @Override
+  public void configure(Configuration job) {
+    if (measure != null) {
+      measure.configure(job);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    String dm = in.readUTF();
+    this.measure = ClassUtils.instantiateAs(dm, DistanceMeasure.class);
+    super.readFields(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeUTF(measure.getClass().getName());
+    super.write(out);
+  }
+
+  @Override
+  public double pdf(VectorWritable vw) {
+    return 1 / (1 + measure.distance(vw.get(), getCenter()));
+  }
+
+  @Override
+  public Model<VectorWritable> sampleFromPosterior() {
+    return new DistanceMeasureCluster(getCenter(), getId(), measure);
+  }
+
+  public DistanceMeasure getMeasure() {
+    return measure;
+  }
+
+  /**
+   * @param measure
+   *          the measure to set
+   */
+  public void setMeasure(DistanceMeasure measure) {
+    this.measure = measure;
+  }
+
+  @Override
+  public String getIdentifier() {
+    return "DMC:" + getId();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/iterator/FuzzyKMeansClusteringPolicy.java
----------------------------------------------------------------------
diff --git 
a/mr/src/main/java/org/apache/mahout/clustering/iterator/FuzzyKMeansClusteringPolicy.java
 
b/mr/src/main/java/org/apache/mahout/clustering/iterator/FuzzyKMeansClusteringPolicy.java
new file mode 100644
index 0000000..bc91f24
--- /dev/null
+++ 
b/mr/src/main/java/org/apache/mahout/clustering/iterator/FuzzyKMeansClusteringPolicy.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.iterator;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.mahout.clustering.Cluster;
+import org.apache.mahout.clustering.classify.ClusterClassifier;
+import org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansClusterer;
+import org.apache.mahout.clustering.fuzzykmeans.SoftCluster;
+import org.apache.mahout.math.Vector;
+
+import com.google.common.collect.Lists;
+
+/**
+ * This is a probability-weighted clustering policy, suitable for fuzzy k-means
+ * clustering
+ * 
+ */
+public class FuzzyKMeansClusteringPolicy extends AbstractClusteringPolicy {
+
+  private double m = 2;
+  private double convergenceDelta = 0.05;
+
+  public FuzzyKMeansClusteringPolicy() {
+  }
+
+  public FuzzyKMeansClusteringPolicy(double m, double convergenceDelta) {
+    this.m = m;
+    this.convergenceDelta = convergenceDelta;
+  }
+
+  @Override
+  public Vector select(Vector probabilities) {
+    return probabilities;
+  }
+  
+  @Override
+  public Vector classify(Vector data, ClusterClassifier prior) {
+    Collection<SoftCluster> clusters = Lists.newArrayList();
+    List<Double> distances = Lists.newArrayList();
+    for (Cluster model : prior.getModels()) {
+      SoftCluster sc = (SoftCluster) model;
+      clusters.add(sc);
+      distances.add(sc.getMeasure().distance(data, sc.getCenter()));
+    }
+    FuzzyKMeansClusterer fuzzyKMeansClusterer = new FuzzyKMeansClusterer();
+    fuzzyKMeansClusterer.setM(m);
+    return fuzzyKMeansClusterer.computePi(clusters, distances);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeDouble(m);
+    out.writeDouble(convergenceDelta);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    this.m = in.readDouble();
+    this.convergenceDelta = in.readDouble();
+  }
+
+  @Override
+  public void close(ClusterClassifier posterior) {
+    for (Cluster cluster : posterior.getModels()) {
+      ((org.apache.mahout.clustering.kmeans.Kluster) 
cluster).calculateConvergence(convergenceDelta);
+      cluster.computeParameters();
+    }
+    
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/iterator/KMeansClusteringPolicy.java
----------------------------------------------------------------------
diff --git 
a/mr/src/main/java/org/apache/mahout/clustering/iterator/KMeansClusteringPolicy.java
 
b/mr/src/main/java/org/apache/mahout/clustering/iterator/KMeansClusteringPolicy.java
new file mode 100644
index 0000000..1cc9faf
--- /dev/null
+++ 
b/mr/src/main/java/org/apache/mahout/clustering/iterator/KMeansClusteringPolicy.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.iterator;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.mahout.clustering.Cluster;
+import org.apache.mahout.clustering.classify.ClusterClassifier;
+
+/**
+ * This is a simple maximum likelihood clustering policy, suitable for k-means
+ * clustering
+ * 
+ */
+public class KMeansClusteringPolicy extends AbstractClusteringPolicy {
+  
+  public KMeansClusteringPolicy() {
+  }
+  
+  public KMeansClusteringPolicy(double convergenceDelta) {
+    this.convergenceDelta = convergenceDelta;
+  }
+  
+  private double convergenceDelta = 0.001;
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeDouble(convergenceDelta);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    this.convergenceDelta = in.readDouble();
+  }
+  
+  @Override
+  public void close(ClusterClassifier posterior) {
+    boolean allConverged = true;
+    for (Cluster cluster : posterior.getModels()) {
+      org.apache.mahout.clustering.kmeans.Kluster kluster = 
(org.apache.mahout.clustering.kmeans.Kluster) cluster;
+      boolean converged = kluster.calculateConvergence(convergenceDelta);
+      allConverged = allConverged && converged;
+      cluster.computeParameters();
+    }
+    
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/kernel/IKernelProfile.java
----------------------------------------------------------------------
diff --git 
a/mr/src/main/java/org/apache/mahout/clustering/kernel/IKernelProfile.java 
b/mr/src/main/java/org/apache/mahout/clustering/kernel/IKernelProfile.java
new file mode 100644
index 0000000..96c4082
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/clustering/kernel/IKernelProfile.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.kernel;
+
+public interface IKernelProfile {
+
+  /**
+   * @return the calculated dervative value of the kernel
+   */
+  double calculateDerivativeValue(double distance, double h);
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/kernel/TriangularKernelProfile.java
----------------------------------------------------------------------
diff --git 
a/mr/src/main/java/org/apache/mahout/clustering/kernel/TriangularKernelProfile.java
 
b/mr/src/main/java/org/apache/mahout/clustering/kernel/TriangularKernelProfile.java
new file mode 100644
index 0000000..46909bb
--- /dev/null
+++ 
b/mr/src/main/java/org/apache/mahout/clustering/kernel/TriangularKernelProfile.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.kernel;
+
+public class TriangularKernelProfile implements IKernelProfile {
+  
+  @Override
+  public double calculateDerivativeValue(double distance, double h) {
+    return distance < h ? 1.0 : 0.0;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java
----------------------------------------------------------------------
diff --git 
a/mr/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java 
b/mr/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java
new file mode 100644
index 0000000..13f6b46
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java
@@ -0,0 +1,257 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.kmeans;
+
+import java.io.IOException;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.clustering.Cluster;
+import org.apache.mahout.clustering.classify.ClusterClassificationDriver;
+import org.apache.mahout.clustering.classify.ClusterClassifier;
+import org.apache.mahout.clustering.iterator.ClusterIterator;
+import org.apache.mahout.clustering.iterator.ClusteringPolicy;
+import org.apache.mahout.clustering.iterator.KMeansClusteringPolicy;
+import org.apache.mahout.clustering.topdown.PathDirectory;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.ClassUtils;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.common.distance.SquaredEuclideanDistanceMeasure;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KMeansDriver extends AbstractJob {
+  
+  private static final Logger log = 
LoggerFactory.getLogger(KMeansDriver.class);
+  
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new Configuration(), new KMeansDriver(), args);
+  }
+  
+  @Override
+  public int run(String[] args) throws Exception {
+    
+    addInputOption();
+    addOutputOption();
+    addOption(DefaultOptionCreator.distanceMeasureOption().create());
+    addOption(DefaultOptionCreator
+        .clustersInOption()
+        .withDescription(
+            "The input centroids, as Vectors.  Must be a SequenceFile of 
Writable, Cluster/Canopy.  "
+                + "If k is also specified, then a random set of vectors will 
be selected"
+                + " and written out to this path first").create());
+    addOption(DefaultOptionCreator
+        .numClustersOption()
+        .withDescription(
+            "The k in k-Means.  If specified, then a random selection of k 
Vectors will be chosen"
+                + " as the Centroid and written to the clusters input 
path.").create());
+    addOption(DefaultOptionCreator.useSetRandomSeedOption().create());
+    addOption(DefaultOptionCreator.convergenceOption().create());
+    addOption(DefaultOptionCreator.maxIterationsOption().create());
+    addOption(DefaultOptionCreator.overwriteOption().create());
+    addOption(DefaultOptionCreator.clusteringOption().create());
+    addOption(DefaultOptionCreator.methodOption().create());
+    addOption(DefaultOptionCreator.outlierThresholdOption().create());
+   
+    if (parseArguments(args) == null) {
+      return -1;
+    }
+    
+    Path input = getInputPath();
+    Path clusters = new 
Path(getOption(DefaultOptionCreator.CLUSTERS_IN_OPTION));
+    Path output = getOutputPath();
+    String measureClass = 
getOption(DefaultOptionCreator.DISTANCE_MEASURE_OPTION);
+    if (measureClass == null) {
+      measureClass = SquaredEuclideanDistanceMeasure.class.getName();
+    }
+    double convergenceDelta = 
Double.parseDouble(getOption(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION));
+    int maxIterations = 
Integer.parseInt(getOption(DefaultOptionCreator.MAX_ITERATIONS_OPTION));
+    if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
+      HadoopUtil.delete(getConf(), output);
+    }
+    DistanceMeasure measure = ClassUtils.instantiateAs(measureClass, 
DistanceMeasure.class);
+    
+    if (hasOption(DefaultOptionCreator.NUM_CLUSTERS_OPTION)) {
+      int numClusters = 
Integer.parseInt(getOption(DefaultOptionCreator.NUM_CLUSTERS_OPTION));
+
+      Long seed = null;
+      if (hasOption(DefaultOptionCreator.RANDOM_SEED)) {
+        seed = Long.parseLong(getOption(DefaultOptionCreator.RANDOM_SEED));
+      }
+
+      clusters = RandomSeedGenerator.buildRandom(getConf(), input, clusters, 
numClusters, measure, seed);
+    }
+    boolean runClustering = hasOption(DefaultOptionCreator.CLUSTERING_OPTION);
+    boolean runSequential = 
getOption(DefaultOptionCreator.METHOD_OPTION).equalsIgnoreCase(
+        DefaultOptionCreator.SEQUENTIAL_METHOD);
+    double clusterClassificationThreshold = 0.0;
+    if (hasOption(DefaultOptionCreator.OUTLIER_THRESHOLD)) {
+      clusterClassificationThreshold = 
Double.parseDouble(getOption(DefaultOptionCreator.OUTLIER_THRESHOLD));
+    }
+    run(getConf(), input, clusters, output, convergenceDelta, maxIterations, 
runClustering,
+        clusterClassificationThreshold, runSequential);
+    return 0;
+  }
+  
+  /**
+   * Iterate over the input vectors to produce clusters and, if requested, use 
the results of the final iteration to
+   * cluster the input vectors.
+   *
+   * @param input
+   *          the directory pathname for input points
+   * @param clustersIn
+   *          the directory pathname for initial & computed clusters
+   * @param output
+   *          the directory pathname for output points
+   * @param convergenceDelta
+   *          the convergence delta value
+   * @param maxIterations
+   *          the maximum number of iterations
+   * @param runClustering
+   *          true if points are to be clustered after iterations are completed
+   * @param clusterClassificationThreshold
+   *          Is a clustering strictness / outlier removal parameter. Its 
value should be between 0 and 1. Vectors
+   *          having pdf below this value will not be clustered.
+   * @param runSequential
+   *          if true execute sequential algorithm
+   */
+  public static void run(Configuration conf, Path input, Path clustersIn, Path 
output,
+    double convergenceDelta, int maxIterations, boolean runClustering, double 
clusterClassificationThreshold,
+    boolean runSequential) throws IOException, InterruptedException, 
ClassNotFoundException {
+    
+    // iterate until the clusters converge
+    String delta = Double.toString(convergenceDelta);
+    if (log.isInfoEnabled()) {
+      log.info("Input: {} Clusters In: {} Out: {}", input, clustersIn, output);
+      log.info("convergence: {} max Iterations: {}", convergenceDelta, 
maxIterations);
+    }
+    Path clustersOut = buildClusters(conf, input, clustersIn, output, 
maxIterations, delta, runSequential);
+    if (runClustering) {
+      log.info("Clustering data");
+      clusterData(conf, input, clustersOut, output, 
clusterClassificationThreshold, runSequential);
+    }
+  }
+  
+  /**
+   * Iterate over the input vectors to produce clusters and, if requested, use 
the results of the final iteration to
+   * cluster the input vectors.
+   *
+   * @param input
+   *          the directory pathname for input points
+   * @param clustersIn
+   *          the directory pathname for initial & computed clusters
+   * @param output
+   *          the directory pathname for output points
+   * @param convergenceDelta
+   *          the convergence delta value
+   * @param maxIterations
+   *          the maximum number of iterations
+   * @param runClustering
+   *          true if points are to be clustered after iterations are completed
+   * @param clusterClassificationThreshold
+   *          Is a clustering strictness / outlier removal parameter. Its 
value should be between 0 and 1. Vectors
+   *          having pdf below this value will not be clustered.
+   * @param runSequential
+   *          if true execute sequential algorithm
+   */
+  public static void run(Path input, Path clustersIn, Path output, double 
convergenceDelta,
+    int maxIterations, boolean runClustering, double 
clusterClassificationThreshold, boolean runSequential)
+    throws IOException, InterruptedException, ClassNotFoundException {
+    run(new Configuration(), input, clustersIn, output, convergenceDelta, 
maxIterations, runClustering,
+        clusterClassificationThreshold, runSequential);
+  }
+  
+  /**
+   * Iterate over the input vectors to produce cluster directories for each 
iteration
+   * 
+   *
+   * @param conf
+   *          the Configuration to use
+   * @param input
+   *          the directory pathname for input points
+   * @param clustersIn
+   *          the directory pathname for initial & computed clusters
+   * @param output
+   *          the directory pathname for output points
+   * @param maxIterations
+   *          the maximum number of iterations
+   * @param delta
+   *          the convergence delta value
+   * @param runSequential
+   *          if true execute sequential algorithm
+   *
+   * @return the Path of the final clusters directory
+   */
+  public static Path buildClusters(Configuration conf, Path input, Path 
clustersIn, Path output,
+    int maxIterations, String delta, boolean runSequential) throws IOException,
+    InterruptedException, ClassNotFoundException {
+    
+    double convergenceDelta = Double.parseDouble(delta);
+    List<Cluster> clusters = Lists.newArrayList();
+    KMeansUtil.configureWithClusterInfo(conf, clustersIn, clusters);
+    
+    if (clusters.isEmpty()) {
+      throw new IllegalStateException("No input clusters found in " + 
clustersIn + ". Check your -c argument.");
+    }
+    
+    Path priorClustersPath = new Path(output, Cluster.INITIAL_CLUSTERS_DIR);
+    ClusteringPolicy policy = new KMeansClusteringPolicy(convergenceDelta);
+    ClusterClassifier prior = new ClusterClassifier(clusters, policy);
+    prior.writeToSeqFiles(priorClustersPath);
+    
+    if (runSequential) {
+      ClusterIterator.iterateSeq(conf, input, priorClustersPath, output, 
maxIterations);
+    } else {
+      ClusterIterator.iterateMR(conf, input, priorClustersPath, output, 
maxIterations);
+    }
+    return output;
+  }
+  
+  /**
+   * Run the job using supplied arguments
+   *
+   * @param input
+   *          the directory pathname for input points
+   * @param clustersIn
+   *          the directory pathname for input clusters
+   * @param output
+   *          the directory pathname for output points
+   * @param clusterClassificationThreshold
+   *          Is a clustering strictness / outlier removal parameter. Its 
value should be between 0 and 1. Vectors
+   *          having pdf below this value will not be clustered.
+   * @param runSequential
+   *          if true execute sequential algorithm
+   */
+  public static void clusterData(Configuration conf, Path input, Path 
clustersIn, Path output,
+    double clusterClassificationThreshold, boolean runSequential) throws 
IOException, InterruptedException,
+    ClassNotFoundException {
+    
+    if (log.isInfoEnabled()) {
+      log.info("Running Clustering");
+      log.info("Input: {} Clusters In: {} Out: {}", input, clustersIn, output);
+    }
+    ClusterClassifier.writePolicy(new KMeansClusteringPolicy(), clustersIn);
+    ClusterClassificationDriver.run(conf, input, output, new Path(output, 
PathDirectory.CLUSTERED_POINTS_DIRECTORY),
+        clusterClassificationThreshold, true, runSequential);
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/kmeans/KMeansUtil.java
----------------------------------------------------------------------
diff --git 
a/mr/src/main/java/org/apache/mahout/clustering/kmeans/KMeansUtil.java 
b/mr/src/main/java/org/apache/mahout/clustering/kmeans/KMeansUtil.java
new file mode 100644
index 0000000..3365f70
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/clustering/kmeans/KMeansUtil.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.kmeans;
+
+import java.util.Collection;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.clustering.Cluster;
+import org.apache.mahout.clustering.canopy.Canopy;
+import org.apache.mahout.clustering.iterator.ClusterWritable;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
+import 
org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class KMeansUtil {
+  
+  private static final Logger log = LoggerFactory.getLogger(KMeansUtil.class);
+
+  private KMeansUtil() {}
+  
+  /**
+   * Create a list of Klusters from whatever Cluster type is passed in as the 
prior
+   * 
+   * @param conf
+   *          the Configuration
+   * @param clusterPath
+   *          the path to the prior Clusters
+   * @param clusters
+   *          a List<Cluster> to put values into
+   */
+  public static void configureWithClusterInfo(Configuration conf, Path 
clusterPath, Collection<Cluster> clusters) {
+    for (Writable value : new SequenceFileDirValueIterable<>(clusterPath, 
PathType.LIST,
+        PathFilters.partFilter(), conf)) {
+      Class<? extends Writable> valueClass = value.getClass();
+      if (valueClass.equals(ClusterWritable.class)) {
+        ClusterWritable clusterWritable = (ClusterWritable) value;
+        value = clusterWritable.getValue();
+        valueClass = value.getClass();
+      }
+      log.debug("Read 1 Cluster from {}", clusterPath);
+      
+      if (valueClass.equals(Kluster.class)) {
+        // get the cluster info
+        clusters.add((Kluster) value);
+      } else if (valueClass.equals(Canopy.class)) {
+        // get the cluster info
+        Canopy canopy = (Canopy) value;
+        clusters.add(new Kluster(canopy.getCenter(), canopy.getId(), 
canopy.getMeasure()));
+      } else {
+        throw new IllegalStateException("Bad value class: " + valueClass);
+      }
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/kmeans/Kluster.java
----------------------------------------------------------------------
diff --git a/mr/src/main/java/org/apache/mahout/clustering/kmeans/Kluster.java 
b/mr/src/main/java/org/apache/mahout/clustering/kmeans/Kluster.java
new file mode 100644
index 0000000..15daec5
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/clustering/kmeans/Kluster.java
@@ -0,0 +1,117 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.clustering.kmeans;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.mahout.clustering.iterator.DistanceMeasureCluster;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.math.Vector;
+
+public class Kluster extends DistanceMeasureCluster {
+  
+  /** Has the centroid converged with the center? */
+  private boolean converged;
+  
+  /** For (de)serialization as a Writable */
+  public Kluster() {
+  }
+  
+  /**
+   * Construct a new cluster with the given point as its center
+   * 
+   * @param center
+   *          the Vector center
+   * @param clusterId
+   *          the int cluster id
+   * @param measure
+   *          a DistanceMeasure
+   */
+  public Kluster(Vector center, int clusterId, DistanceMeasure measure) {
+    super(center, clusterId, measure);
+  }
+  
+  /**
+   * Format the cluster for output
+   * 
+   * @param cluster
+   *          the Cluster
+   * @return the String representation of the Cluster
+   */
+  public static String formatCluster(Kluster cluster) {
+    return cluster.getIdentifier() + ": " + 
cluster.computeCentroid().asFormatString();
+  }
+  
+  public String asFormatString() {
+    return formatCluster(this);
+  }
+  
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    out.writeBoolean(converged);
+  }
+  
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    this.converged = in.readBoolean();
+  }
+  
+  @Override
+  public String toString() {
+    return asFormatString(null);
+  }
+  
+  @Override
+  public String getIdentifier() {
+    return (converged ? "VL-" : "CL-") + getId();
+  }
+  
+  /**
+   * Return if the cluster is converged by comparing its center and centroid.
+   * 
+   * @param measure
+   *          The distance measure to use for cluster-point comparisons.
+   * @param convergenceDelta
+   *          the convergence delta to use for stopping.
+   * @return if the cluster is converged
+   */
+  public boolean computeConvergence(DistanceMeasure measure, double 
convergenceDelta) {
+    Vector centroid = computeCentroid();
+    converged = measure.distance(centroid.getLengthSquared(), centroid, 
getCenter()) <= convergenceDelta;
+    return converged;
+  }
+  
+  @Override
+  public boolean isConverged() {
+    return converged;
+  }
+  
+  protected void setConverged(boolean converged) {
+    this.converged = converged;
+  }
+  
+  public boolean calculateConvergence(double convergenceDelta) {
+    Vector centroid = computeCentroid();
+    converged = getMeasure().distance(centroid.getLengthSquared(), centroid, 
getCenter()) <= convergenceDelta;
+    return converged;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/kmeans/RandomSeedGenerator.java
----------------------------------------------------------------------
diff --git 
a/mr/src/main/java/org/apache/mahout/clustering/kmeans/RandomSeedGenerator.java 
b/mr/src/main/java/org/apache/mahout/clustering/kmeans/RandomSeedGenerator.java
new file mode 100644
index 0000000..cc9e4cd
--- /dev/null
+++ 
b/mr/src/main/java/org/apache/mahout/clustering/kmeans/RandomSeedGenerator.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.clustering.kmeans;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.io.Closeables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.clustering.iterator.ClusterWritable;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.RandomUtils;
+import org.apache.mahout.common.distance.DistanceMeasure;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable;
+import org.apache.mahout.math.VectorWritable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Given an Input Path containing a {@link org.apache.hadoop.io.SequenceFile}, 
randomly select k vectors and
+ * write them to the output file as a {@link 
org.apache.mahout.clustering.kmeans.Kluster} representing the
+ * initial centroid to use.
+ *
+ * This implementation uses reservoir sampling as described in 
http://en.wikipedia.org/wiki/Reservoir_sampling
+ */
+public final class RandomSeedGenerator {
+  
+  private static final Logger log = 
LoggerFactory.getLogger(RandomSeedGenerator.class);
+  
+  public static final String K = "k";
+  
+  private RandomSeedGenerator() {}
+
+  public static Path buildRandom(Configuration conf, Path input, Path output, 
int k, DistanceMeasure measure)
+    throws IOException {
+    return buildRandom(conf, input, output, k, measure, null);
+  }
+
+  public static Path buildRandom(Configuration conf,
+                                 Path input,
+                                 Path output,
+                                 int k,
+                                 DistanceMeasure measure,
+                                 Long seed) throws IOException {
+
+    Preconditions.checkArgument(k > 0, "Must be: k > 0, but k = " + k);
+    // delete the output directory
+    FileSystem fs = FileSystem.get(output.toUri(), conf);
+    HadoopUtil.delete(conf, output);
+    Path outFile = new Path(output, "part-randomSeed");
+    boolean newFile = fs.createNewFile(outFile);
+    if (newFile) {
+      Path inputPathPattern;
+
+      if (fs.getFileStatus(input).isDir()) {
+        inputPathPattern = new Path(input, "*");
+      } else {
+        inputPathPattern = input;
+      }
+      
+      FileStatus[] inputFiles = fs.globStatus(inputPathPattern, 
PathFilters.logsCRCFilter());
+      SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, 
outFile, Text.class, ClusterWritable.class);
+
+      Random random = (seed != null) ? RandomUtils.getRandom(seed) : 
RandomUtils.getRandom();
+
+      List<Text> chosenTexts = Lists.newArrayListWithCapacity(k);
+      List<ClusterWritable> chosenClusters = Lists.newArrayListWithCapacity(k);
+      int nextClusterId = 0;
+
+      int index = 0;
+      for (FileStatus fileStatus : inputFiles) {
+        if (!fileStatus.isDir()) {
+          for (Pair<Writable, VectorWritable> record
+              : new SequenceFileIterable<Writable, 
VectorWritable>(fileStatus.getPath(), true, conf)) {
+            Writable key = record.getFirst();
+            VectorWritable value = record.getSecond();
+            Kluster newCluster = new Kluster(value.get(), nextClusterId++, 
measure);
+            newCluster.observe(value.get(), 1);
+            Text newText = new Text(key.toString());
+            int currentSize = chosenTexts.size();
+            if (currentSize < k) {
+              chosenTexts.add(newText);
+              ClusterWritable clusterWritable = new ClusterWritable();
+              clusterWritable.setValue(newCluster);
+              chosenClusters.add(clusterWritable);
+            } else {
+              int j = random.nextInt(index);
+              if (j < k) {
+                chosenTexts.set(j, newText);
+                ClusterWritable clusterWritable = new ClusterWritable();
+                clusterWritable.setValue(newCluster);
+                chosenClusters.set(j, clusterWritable);
+              }
+            }
+            index++;
+          }
+        }
+      }
+
+      try {
+        for (int i = 0; i < chosenTexts.size(); i++) {
+          writer.append(chosenTexts.get(i), chosenClusters.get(i));
+        }
+        log.info("Wrote {} Klusters to {}", k, outFile);
+      } finally {
+        Closeables.close(writer, false);
+      }
+    }
+    
+    return outFile;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/b988c493/mr/src/main/java/org/apache/mahout/clustering/kmeans/package-info.java
----------------------------------------------------------------------
diff --git 
a/mr/src/main/java/org/apache/mahout/clustering/kmeans/package-info.java 
b/mr/src/main/java/org/apache/mahout/clustering/kmeans/package-info.java
new file mode 100644
index 0000000..d6921b6
--- /dev/null
+++ b/mr/src/main/java/org/apache/mahout/clustering/kmeans/package-info.java
@@ -0,0 +1,5 @@
+/**
+ * This package provides an implementation of the <a 
href="http://en.wikipedia.org/wiki/Kmeans";>k-means</a> clustering
+ * algorithm.
+ */
+package org.apache.mahout.clustering.kmeans;

Reply via email to