Author: jeastman
Date: Thu Jul 21 21:13:02 2011
New Revision: 1149369
URL: http://svn.apache.org/viewvc?rev=1149369&view=rev
Log:
MAHOUT-749: Implemented multiple reducer approach from Jira patch, plus a
scalability enhancement to avoid accumulating merged clusterIds if -cl option
is not present. The defaults are for the same behavior as before. All tests
run though this needs more testing to see how it really scales
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopy.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyClusterer.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyConfigKeys.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyDriver.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyMapper.java
mahout/trunk/core/src/test/java/org/apache/mahout/clustering/meanshift/TestMeanShift.java
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayMeanShift.java
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopy.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopy.java?rev=1149369&r1=1149368&r2=1149369&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopy.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopy.java
Thu Jul 21 21:13:02 2011
@@ -34,16 +34,27 @@ import org.apache.mahout.math.list.IntAr
* centroid when needed.
*/
public class MeanShiftCanopy extends Cluster {
-
+
// TODO: this is still problematic from a scalability perspective, but how
// else to encode membership?
private IntArrayList boundPoints = new IntArrayList();
-
+
+ private int mass = 0;
+
+ public int getMass() {
+ return mass;
+ }
+
+ void setMass(int num) {
+ mass = num;
+ }
+
/**
* Used for Writable
*/
- public MeanShiftCanopy() {}
-
+ public MeanShiftCanopy() {
+ }
+
/**
* Create a new Canopy containing the given point
*
@@ -57,8 +68,9 @@ public class MeanShiftCanopy extends Clu
public MeanShiftCanopy(Vector point, int id, DistanceMeasure measure) {
super(point, id, measure);
boundPoints.add(id);
+ mass = 1;
}
-
+
/**
* Create an initial Canopy, retaining the original type of the given point
* (e.g. NamedVector)
@@ -78,43 +90,26 @@ public class MeanShiftCanopy extends Clu
result.setCenter(point);
return result;
}
-
- /**
- * Create a new Canopy containing the given point, id and bound points
- *
- * @param point
- * a Vector
- * @param id
- * an int identifying the canopy local to this process only
- * @param boundPoints
- * a IntArrayList containing points ids bound to the canopy
- * @param converged
- * true if the canopy has converged
- */
- MeanShiftCanopy(Vector point, int id, IntArrayList boundPoints,
- boolean converged) {
- this.setId(id);
- this.setCenter(point);
- this.setRadius(point.like());
- this.setNumPoints(1);
- this.boundPoints = boundPoints;
- setConverged(converged);
- }
-
+
public IntArrayList getBoundPoints() {
return boundPoints;
}
-
+
/**
* The receiver overlaps the given canopy. Add my bound points to it.
*
* @param canopy
* an existing MeanShiftCanopy
+ * @param accumulateBoundPoints
+ * true to accumulate bound points from the canopy
*/
- void merge(MeanShiftCanopy canopy) {
- boundPoints.addAllOf(canopy.boundPoints);
+ void merge(MeanShiftCanopy canopy, boolean accumulateBoundPoints) {
+ if (accumulateBoundPoints) {
+ boundPoints.addAllOf(canopy.boundPoints);
+ }
+ mass += canopy.mass;
}
-
+
/**
* The receiver touches the given canopy. Add respective centers with the
* given weights.
@@ -125,29 +120,32 @@ public class MeanShiftCanopy extends Clu
* double weight of the touching
*/
void touch(MeanShiftCanopy canopy, double weight) {
- canopy.observe(getCenter(), weight * boundPoints.size());
- observe(canopy.getCenter(), weight * canopy.boundPoints.size());
+ canopy.observe(getCenter(), weight * mass);
+ observe(canopy.getCenter(), weight * canopy.mass);
}
-
+
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
+ this.mass = in.readInt();
int numpoints = in.readInt();
this.boundPoints = new IntArrayList();
for (int i = 0; i < numpoints; i++) {
this.boundPoints.add(in.readInt());
}
+ this.mass = boundPoints.size();
}
-
+
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
+ out.writeInt(mass);
out.writeInt(boundPoints.size());
for (int v : boundPoints.elements()) {
out.writeInt(v);
}
}
-
+
public MeanShiftCanopy shallowCopy() {
MeanShiftCanopy result = new MeanShiftCanopy();
result.setMeasure(this.getMeasure());
@@ -156,28 +154,29 @@ public class MeanShiftCanopy extends Clu
result.setRadius(this.getRadius());
result.setNumPoints(this.getNumPoints());
result.setBoundPoints(boundPoints);
+ result.setMass(mass);
return result;
}
-
+
@Override
public String asFormatString() {
return toString();
}
-
+
public void setBoundPoints(IntArrayList boundPoints) {
this.boundPoints = boundPoints;
}
-
+
@Override
public String getIdentifier() {
return (isConverged() ? "MSV-" : "MSC-") + getId();
}
-
+
@Override
public double pdf(VectorWritable vw) {
// MSCanopy membership is explicit via membership in boundPoints. Can't
// compute pdf for Arbitrary point
throw new UnsupportedOperationException();
}
-
+
}
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyClusterer.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyClusterer.java?rev=1149369&r1=1149368&r2=1149369&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyClusterer.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyClusterer.java
Thu Jul 21 21:13:02 2011
@@ -30,30 +30,34 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MeanShiftCanopyClusterer {
-
+
private static final Logger log = LoggerFactory
.getLogger(MeanShiftCanopyClusterer.class);
-
+
private final double convergenceDelta;
-
+
// the T1 distance threshold
private final double t1;
-
+
// the T2 distance threshold
private final double t2;
-
+
// the distance measure
private final DistanceMeasure measure;
-
+
private final IKernelProfile kernelProfile;
-
+
+ // if true accumulate clusters during merge so clusters can be produced later
+ private final boolean runClustering;
+
public MeanShiftCanopyClusterer(Configuration configuration) {
try {
- measure = Class
- .forName(
-
configuration.get(MeanShiftCanopyConfigKeys.DISTANCE_MEASURE_KEY))
+ measure = Class.forName(
+ configuration.get(MeanShiftCanopyConfigKeys.DISTANCE_MEASURE_KEY))
.asSubclass(DistanceMeasure.class).newInstance();
measure.configure(configuration);
+ runClustering = configuration.getBoolean(
+ MeanShiftCanopyConfigKeys.CLUSTER_POINTS_KEY, true);
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e);
} catch (IllegalAccessException e) {
@@ -62,9 +66,8 @@ public class MeanShiftCanopyClusterer {
throw new IllegalStateException(e);
}
try {
- kernelProfile = Class
- .forName(
- configuration.get(MeanShiftCanopyConfigKeys.KERNEL_PROFILE_KEY))
+ kernelProfile = Class.forName(
+ configuration.get(MeanShiftCanopyConfigKeys.KERNEL_PROFILE_KEY))
.asSubclass(IKernelProfile.class).newInstance();
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e);
@@ -81,26 +84,27 @@ public class MeanShiftCanopyClusterer {
convergenceDelta = Double.parseDouble(configuration
.get(MeanShiftCanopyConfigKeys.CLUSTER_CONVERGENCE_KEY));
}
-
+
public MeanShiftCanopyClusterer(DistanceMeasure aMeasure,
IKernelProfile aKernelProfileDerivative, double aT1, double aT2,
- double aDelta) {
+ double aDelta, boolean runClustering) {
// nextCanopyId = 100; // so canopyIds will sort properly // never read?
measure = aMeasure;
t1 = aT1;
t2 = aT2;
convergenceDelta = aDelta;
kernelProfile = aKernelProfileDerivative;
+ this.runClustering = runClustering;
}
-
+
public double getT1() {
return t1;
}
-
+
public double getT2() {
return t2;
}
-
+
/**
* Merge the given canopy into the canopies list. If it touches any existing
* canopy (norm<T1) then add the center of each to the other. If it covers
any
@@ -131,10 +135,10 @@ public class MeanShiftCanopyClusterer {
if (closestCoveringCanopy == null) {
canopies.add(aCanopy);
} else {
- closestCoveringCanopy.merge(aCanopy);
+ closestCoveringCanopy.merge(aCanopy, runClustering);
}
}
-
+
/**
* Shift the center to the new centroid of the cluster
*
@@ -143,12 +147,12 @@ public class MeanShiftCanopyClusterer {
* @return if the cluster is converged
*/
public boolean shiftToMean(MeanShiftCanopy canopy) {
- canopy.observe(canopy.getCenter(), canopy.getBoundPoints().size());
+ canopy.observe(canopy.getCenter(), canopy.getMass());
canopy.computeConvergence(measure, convergenceDelta);
canopy.computeParameters();
return canopy.isConverged();
}
-
+
/**
* Return if the point is covered by this canopy
*
@@ -161,7 +165,7 @@ public class MeanShiftCanopyClusterer {
boolean covers(MeanShiftCanopy canopy, Vector point) {
return measure.distance(canopy.getCenter(), point) < t1;
}
-
+
/**
* Return if the point is closely covered by the canopy
*
@@ -174,7 +178,7 @@ public class MeanShiftCanopyClusterer {
public boolean closelyBound(MeanShiftCanopy canopy, Vector point) {
return measure.distance(canopy.getCenter(), point) < t2;
}
-
+
/**
* This is the reference mean-shift implementation. Given its inputs it
* iterates over the points and clusters until their centers converge or
until
@@ -191,22 +195,22 @@ public class MeanShiftCanopyClusterer {
DistanceMeasure measure, IKernelProfile aKernelProfileDerivative,
double convergenceThreshold, double t1, double t2, int numIter) {
MeanShiftCanopyClusterer clusterer = new MeanShiftCanopyClusterer(measure,
- aKernelProfileDerivative, t1, t2, convergenceThreshold);
+ aKernelProfileDerivative, t1, t2, convergenceThreshold, true);
int nextCanopyId = 0;
-
+
List<MeanShiftCanopy> canopies = Lists.newArrayList();
for (Vector point : points) {
clusterer.mergeCanopy(
new MeanShiftCanopy(point, nextCanopyId++, measure), canopies);
}
List<MeanShiftCanopy> newCanopies = canopies;
- boolean[] converged = {false};
+ boolean[] converged = { false };
for (int iter = 0; !converged[0] && iter < numIter; iter++) {
newCanopies = clusterer.iterate(newCanopies, converged);
}
return newCanopies;
}
-
+
protected List<MeanShiftCanopy> iterate(Iterable<MeanShiftCanopy> canopies,
boolean[] converged) {
converged[0] = true;
@@ -217,22 +221,7 @@ public class MeanShiftCanopyClusterer {
}
return migratedCanopies;
}
-
- protected static void verifyNonOverlap(Iterable<MeanShiftCanopy> canopies) {
- Collection<Integer> coveredPoints = new HashSet<Integer>();
- // verify no overlap
- for (MeanShiftCanopy canopy : canopies) {
- for (int v : canopy.getBoundPoints().toList()) {
- if (coveredPoints.contains(v)) {
- log.info("Duplicate bound point: {} in Canopy: {}", v,
- canopy.asFormatString(null));
- } else {
- coveredPoints.add(v);
- }
- }
- }
- }
-
+
protected static MeanShiftCanopy findCoveringCanopy(MeanShiftCanopy canopy,
Iterable<MeanShiftCanopy> clusters) {
// canopies use canopyIds assigned when input vectors are processed as
@@ -247,5 +236,5 @@ public class MeanShiftCanopyClusterer {
}
return null;
}
-
+
}
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyConfigKeys.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyConfigKeys.java?rev=1149369&r1=1149368&r2=1149369&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyConfigKeys.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyConfigKeys.java
Thu Jul 21 21:13:02 2011
@@ -18,13 +18,14 @@
package org.apache.mahout.clustering.meanshift;
public interface MeanShiftCanopyConfigKeys {
-
- // keys used by Driver, Mapper, Combiner & Reducer
- String DISTANCE_MEASURE_KEY = "org.apache.mahout.clustering.canopy.measure";
- String KERNEL_PROFILE_KEY =
"org.apache.mahout.clustering.canopy.kernelprofile";
- String T1_KEY = "org.apache.mahout.clustering.canopy.t1";
- String T2_KEY = "org.apache.mahout.clustering.canopy.t2";
- String CONTROL_PATH_KEY = "org.apache.mahout.clustering.control.path";
- String CLUSTER_CONVERGENCE_KEY =
"org.apache.mahout.clustering.canopy.convergence";
-
+
+ // keys used by Driver, Mapper, Combiner & Reducer
+ String DISTANCE_MEASURE_KEY =
"org.apache.mahout.clustering.canopy.measure";
+ String KERNEL_PROFILE_KEY =
"org.apache.mahout.clustering.canopy.kernelprofile";
+ String T1_KEY = "org.apache.mahout.clustering.canopy.t1";
+ String T2_KEY = "org.apache.mahout.clustering.canopy.t2";
+ String CONTROL_PATH_KEY = "org.apache.mahout.clustering.control.path";
+ String CLUSTER_CONVERGENCE_KEY =
"org.apache.mahout.clustering.canopy.convergence";
+ String CLUSTER_POINTS_KEY =
"org.apache.mahout.clustering.meanshift.clusterPointsKey";
+
}
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyDriver.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyDriver.java?rev=1149369&r1=1149368&r2=1149369&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyDriver.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyDriver.java
Thu Jul 21 21:13:02 2011
@@ -57,19 +57,27 @@ import org.slf4j.LoggerFactory;
import com.google.common.io.Closeables;
+/**
+ * This class implements the driver for Mean Shift Canopy clustering
+ *
+ */
public class MeanShiftCanopyDriver extends AbstractJob {
-
+
+ public static final String MAPRED_REDUCE_TASKS = "mapred.reduce.tasks";
+
private static final Logger log = LoggerFactory
.getLogger(MeanShiftCanopyDriver.class);
-
+
public static final String INPUT_IS_CANOPIES_OPTION = "inputIsCanopies";
+
public static final String STATE_IN_KEY =
"org.apache.mahout.clustering.meanshift.stateInKey";
+
private static final String CONTROL_CONVERGED = "control/converged";
-
+
public static void main(String[] args) throws Exception {
ToolRunner.run(new Configuration(), new MeanShiftCanopyDriver(), args);
}
-
+
@Override
public int run(String[] args) throws Exception {
addInputOption();
@@ -84,11 +92,11 @@ public class MeanShiftCanopyDriver exten
addOption(DefaultOptionCreator.t2Option().create());
addOption(DefaultOptionCreator.clusteringOption().create());
addOption(DefaultOptionCreator.methodOption().create());
-
+
if (parseArguments(args) == null) {
return -1;
}
-
+
Path input = getInputPath();
Path output = getOutputPath();
if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
@@ -107,17 +115,17 @@ public class MeanShiftCanopyDriver exten
boolean runSequential = getOption(DefaultOptionCreator.METHOD_OPTION)
.equalsIgnoreCase(DefaultOptionCreator.SEQUENTIAL_METHOD);
ClassLoader ccl = Thread.currentThread().getContextClassLoader();
- DistanceMeasure measure = ccl.loadClass(measureClass)
- .asSubclass(DistanceMeasure.class).newInstance();
+ DistanceMeasure measure = ccl.loadClass(measureClass).asSubclass(
+ DistanceMeasure.class).newInstance();
IKernelProfile kernelProfile = ccl.loadClass(kernelProfileClass)
.asSubclass(IKernelProfile.class).newInstance();
run(getConf(), input, output, measure, kernelProfile, t1, t2,
convergenceDelta, maxIterations, inputIsCanopies, runClustering,
runSequential);
-
+
return 0;
}
-
+
/**
* Run the job where the input format can be either Vectors or Canopies. If
* requested, cluster the input data using the computed Canopies
@@ -160,16 +168,17 @@ public class MeanShiftCanopyDriver exten
} else {
createCanopyFromVectors(conf, input, clustersIn, measure, runSequential);
}
-
+
Path clustersOut = buildClusters(conf, clustersIn, output, measure,
- kernelProfile, t1, t2, convergenceDelta, maxIterations, runSequential);
+ kernelProfile, t1, t2, convergenceDelta, maxIterations, runSequential,
+ runClustering);
if (runClustering) {
clusterData(inputIsCanopies ? input : new Path(output,
Cluster.INITIAL_CLUSTERS_DIR), clustersOut, new Path(output,
Cluster.CLUSTERED_POINTS_DIR), runSequential);
}
}
-
+
/**
* Convert input vectors to MeanShiftCanopies for further processing
*/
@@ -182,7 +191,7 @@ public class MeanShiftCanopyDriver exten
createCanopyFromVectorsMR(conf, input, output, measure);
}
}
-
+
/**
* Convert vectors to MeanShiftCanopies sequentially
*
@@ -206,15 +215,15 @@ public class MeanShiftCanopyDriver exten
try {
for (VectorWritable value : new
SequenceFileValueIterable<VectorWritable>(
s.getPath(), conf)) {
- writer.append(new Text(),
- MeanShiftCanopy.initialCanopy(value.get(), id++, measure));
+ writer.append(new Text(), MeanShiftCanopy.initialCanopy(value.get(),
+ id++, measure));
}
} finally {
Closeables.closeQuietly(writer);
}
}
}
-
+
/**
* Convert vectors to MeanShiftCanopies using Hadoop
*/
@@ -231,16 +240,16 @@ public class MeanShiftCanopyDriver exten
job.setNumReduceTasks(0);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
-
+
FileInputFormat.setInputPaths(job, input);
FileOutputFormat.setOutputPath(job, output);
-
+
if (!job.waitForCompletion(true)) {
throw new InterruptedException(
"Mean Shift createCanopyFromVectorsMR failed on input " + input);
}
}
-
+
/**
* Iterate over the input clusters to produce the next cluster directories
for
* each iteration
@@ -265,30 +274,33 @@ public class MeanShiftCanopyDriver exten
* an int number of iterations
* @param runSequential
* if true run in sequential execution mode
+ * @param runClustering
+ * if true accumulate merged clusters for subsequent clustering step
*/
public static Path buildClusters(Configuration conf, Path clustersIn,
Path output, DistanceMeasure measure, IKernelProfile kernelProfile,
double t1, double t2, double convergenceDelta, int maxIterations,
- boolean runSequential) throws IOException, InterruptedException,
- ClassNotFoundException {
+ boolean runSequential, boolean runClustering) throws IOException,
+ InterruptedException, ClassNotFoundException {
if (runSequential) {
return buildClustersSeq(clustersIn, output, measure, kernelProfile, t1,
- t2, convergenceDelta, maxIterations);
+ t2, convergenceDelta, maxIterations, runClustering);
} else {
return buildClustersMR(conf, clustersIn, output, measure, kernelProfile,
- t1, t2, convergenceDelta, maxIterations);
+ t1, t2, convergenceDelta, maxIterations, runClustering);
}
}
-
+
/**
* Build new clusters sequentially
*
*/
private static Path buildClustersSeq(Path clustersIn, Path output,
DistanceMeasure measure, IKernelProfile aKernelProfile, double t1,
- double t2, double convergenceDelta, int maxIterations) throws
IOException {
+ double t2, double convergenceDelta, int maxIterations,
+ boolean runClustering) throws IOException {
MeanShiftCanopyClusterer clusterer = new MeanShiftCanopyClusterer(measure,
- aKernelProfile, t1, t2, convergenceDelta);
+ aKernelProfile, t1, t2, convergenceDelta, runClustering);
List<MeanShiftCanopy> clusters = Lists.newArrayList();
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(clustersIn.toUri(), conf);
@@ -296,7 +308,7 @@ public class MeanShiftCanopyDriver exten
clustersIn, PathType.LIST, PathFilters.logsCRCFilter(), conf)) {
clusterer.mergeCanopy(value, clusters);
}
- boolean[] converged = {false};
+ boolean[] converged = { false };
int iteration = 1;
while (!converged[0] && iteration <= maxIterations) {
log.info("Mean Shift Iteration: {}", iteration);
@@ -308,11 +320,11 @@ public class MeanShiftCanopyDriver exten
for (MeanShiftCanopy cluster : clusters) {
log.debug(
"Writing Cluster:{} center:{} numPoints:{} radius:{} to: {}",
- new Object[] {cluster.getId(),
+ new Object[] { cluster.getId(),
AbstractCluster.formatVector(cluster.getCenter(), null),
cluster.getNumPoints(),
AbstractCluster.formatVector(cluster.getRadius(), null),
- clustersOut.getName()});
+ clustersOut.getName() });
writer.append(new Text(cluster.getIdentifier()), cluster);
}
} finally {
@@ -323,33 +335,43 @@ public class MeanShiftCanopyDriver exten
}
return clustersIn;
}
-
+
/**
* Build new clusters using Hadoop
+ *
*/
private static Path buildClustersMR(Configuration conf, Path clustersIn,
Path output, DistanceMeasure measure, IKernelProfile aKernelProfile,
- double t1, double t2, double convergenceDelta, int maxIterations)
- throws IOException, InterruptedException, ClassNotFoundException {
+ double t1, double t2, double convergenceDelta, int maxIterations,
+ boolean runClustering) throws IOException, InterruptedException,
+ ClassNotFoundException {
// iterate until the clusters converge
boolean converged = false;
int iteration = 1;
while (!converged && iteration <= maxIterations) {
- log.info("Mean Shift Iteration {}", iteration);
+ int numReducers = Integer.valueOf(conf.get(MAPRED_REDUCE_TASKS, "1"));
+ log.info("Mean Shift Iteration: {}, numReducers {}", new Object[] {
+ iteration, numReducers });
// point the output to a new directory per iteration
Path clustersOut = new Path(output, Cluster.CLUSTERS_DIR + iteration);
Path controlOut = new Path(output, CONTROL_CONVERGED);
runIterationMR(conf, clustersIn, clustersOut, controlOut, measure
.getClass().getName(), aKernelProfile.getClass().getName(), t1, t2,
- convergenceDelta);
+ convergenceDelta, runClustering);
converged = FileSystem.get(new Configuration()).exists(controlOut);
// now point the input to the old output directory
clustersIn = clustersOut;
iteration++;
+ // decrease the number of reducers if it is > 1 to cross-pollenate
+ // map sets
+ if (numReducers > 1) {
+ numReducers--;
+ conf.set(MAPRED_REDUCE_TASKS, String.valueOf(numReducers));
+ }
}
return clustersIn;
}
-
+
/**
* Run an iteration using Hadoop
*
@@ -371,33 +393,35 @@ public class MeanShiftCanopyDriver exten
* the T2 distance threshold
* @param convergenceDelta
* the double convergence criteria
+ * @param runClustering
+ * if true accumulate merged clusters for subsequent clustering step
*/
private static void runIterationMR(Configuration conf, Path input,
Path output, Path control, String measureClassName,
String kernelProfileClassName, double t1, double t2,
- double convergenceDelta) throws IOException, InterruptedException,
- ClassNotFoundException {
-
+ double convergenceDelta, boolean runClustering) throws IOException,
+ InterruptedException, ClassNotFoundException {
+
conf.set(MeanShiftCanopyConfigKeys.DISTANCE_MEASURE_KEY, measureClassName);
conf.set(MeanShiftCanopyConfigKeys.KERNEL_PROFILE_KEY,
kernelProfileClassName);
- conf.set(MeanShiftCanopyConfigKeys.CLUSTER_CONVERGENCE_KEY,
- String.valueOf(convergenceDelta));
+ conf.set(MeanShiftCanopyConfigKeys.CLUSTER_CONVERGENCE_KEY, String
+ .valueOf(convergenceDelta));
conf.set(MeanShiftCanopyConfigKeys.T1_KEY, String.valueOf(t1));
conf.set(MeanShiftCanopyConfigKeys.T2_KEY, String.valueOf(t2));
conf.set(MeanShiftCanopyConfigKeys.CONTROL_PATH_KEY, control.toString());
-
+ conf.set(MeanShiftCanopyConfigKeys.CLUSTER_POINTS_KEY, String
+ .valueOf(runClustering));
Job job = new Job(conf,
"Mean Shift Driver running runIteration over input: " + input);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(MeanShiftCanopy.class);
-
+
FileInputFormat.setInputPaths(job, input);
FileOutputFormat.setOutputPath(job, output);
-
+
job.setMapperClass(MeanShiftCanopyMapper.class);
job.setReducerClass(MeanShiftCanopyReducer.class);
- job.setNumReduceTasks(1);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setJarByClass(MeanShiftCanopyDriver.class);
@@ -406,7 +430,7 @@ public class MeanShiftCanopyDriver exten
+ input);
}
}
-
+
/**
* Run the job using supplied arguments
*
@@ -428,7 +452,7 @@ public class MeanShiftCanopyDriver exten
clusterDataMR(input, clustersIn, output);
}
}
-
+
/**
* Cluster the data sequentially
*/
@@ -450,7 +474,7 @@ public class MeanShiftCanopyDriver exten
output, "part-m-" + part++), IntWritable.class,
WeightedVectorWritable.class);
try {
- for (Pair<Writable,MeanShiftCanopy> record : new
SequenceFileIterable<Writable,MeanShiftCanopy>(
+ for (Pair<Writable, MeanShiftCanopy> record : new
SequenceFileIterable<Writable, MeanShiftCanopy>(
s.getPath(), conf)) {
MeanShiftCanopy canopy = record.getSecond();
MeanShiftCanopy closest = MeanShiftCanopyClusterer
@@ -463,7 +487,7 @@ public class MeanShiftCanopyDriver exten
}
}
}
-
+
/**
* Cluster the data using Hadoop
*/
@@ -476,15 +500,15 @@ public class MeanShiftCanopyDriver exten
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(WeightedVectorWritable.class);
job.setMapperClass(MeanShiftCanopyClusterMapper.class);
-
+
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setNumReduceTasks(0);
job.setJarByClass(MeanShiftCanopyDriver.class);
-
+
FileInputFormat.setInputPaths(job, input);
FileOutputFormat.setOutputPath(job, output);
-
+
if (!job.waitForCompletion(true)) {
throw new InterruptedException(
"Mean Shift Clustering failed on clustersIn " + clustersIn);
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyMapper.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyMapper.java?rev=1149369&r1=1149368&r2=1149369&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyMapper.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyMapper.java
Thu Jul 21 21:13:02 2011
@@ -21,6 +21,8 @@ import java.io.IOException;
import java.util.Collection;
import com.google.common.collect.Lists;
+
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
@@ -31,10 +33,14 @@ public class MeanShiftCanopyMapper exten
private MeanShiftCanopyClusterer clusterer;
+private Integer numReducers;
+
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
super.setup(context);
- clusterer = new MeanShiftCanopyClusterer(context.getConfiguration());
+ Configuration conf = context.getConfiguration();
+ clusterer = new MeanShiftCanopyClusterer(conf);
+ numReducers =
Integer.valueOf(conf.get(MeanShiftCanopyDriver.MAPRED_REDUCE_TASKS, "1"));
}
@Override
@@ -45,9 +51,14 @@ public class MeanShiftCanopyMapper exten
@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
+ int reducer = 0;
for (MeanShiftCanopy canopy : canopies) {
clusterer.shiftToMean(canopy);
- context.write(new Text("canopy"), canopy);
+ context.write(new Text(String.valueOf(reducer)), canopy);
+ reducer++;
+ if (reducer >= numReducers){
+ reducer=0;
+ }
}
super.cleanup(context);
}
Modified:
mahout/trunk/core/src/test/java/org/apache/mahout/clustering/meanshift/TestMeanShift.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/clustering/meanshift/TestMeanShift.java?rev=1149369&r1=1149368&r2=1149369&view=diff
==============================================================================
---
mahout/trunk/core/src/test/java/org/apache/mahout/clustering/meanshift/TestMeanShift.java
(original)
+++
mahout/trunk/core/src/test/java/org/apache/mahout/clustering/meanshift/TestMeanShift.java
Thu Jul 21 21:13:02 2011
@@ -51,14 +51,15 @@ import org.junit.Before;
import org.junit.Test;
public final class TestMeanShift extends MahoutTestCase {
-
+
private Vector[] raw = null;
-
+
// DistanceMeasure manhattanDistanceMeasure = new ManhattanDistanceMeasure();
-
+
private final DistanceMeasure euclideanDistanceMeasure = new
EuclideanDistanceMeasure();
+
private final IKernelProfile kernelProfile = new TriangularKernelProfile();
-
+
/**
* Print the canopies to the transcript
*
@@ -70,7 +71,7 @@ public final class TestMeanShift extends
System.out.println(canopy.asFormatString(null));
}
}
-
+
/**
* Print a graphical representation of the clustered image points as a 10x10
* character mask
@@ -93,7 +94,7 @@ public final class TestMeanShift extends
System.out.println(anOut);
}
}
-
+
private List<MeanShiftCanopy> getInitialCanopies() {
int nextCanopyId = 0;
List<MeanShiftCanopy> canopies = Lists.newArrayList();
@@ -103,7 +104,7 @@ public final class TestMeanShift extends
}
return canopies;
}
-
+
@Override
@Before
public void setUp() throws Exception {
@@ -124,7 +125,7 @@ public final class TestMeanShift extends
}
}
}
-
+
/**
* Story: User can exercise the reference implementation to verify that the
* test datapoints are clustered in a reasonable manner.
@@ -133,7 +134,7 @@ public final class TestMeanShift extends
public void testReferenceImplementation() {
MeanShiftCanopyClusterer clusterer = new MeanShiftCanopyClusterer(
new EuclideanDistanceMeasure(), new TriangularKernelProfile(), 4.0,
- 1.0, 0.5);
+ 1.0, 0.5, true);
List<MeanShiftCanopy> canopies = Lists.newArrayList();
// add all points to the canopies
int nextCanopyId = 0;
@@ -156,7 +157,7 @@ public final class TestMeanShift extends
System.out.println(iter++);
}
}
-
+
/**
* Test the MeanShiftCanopyClusterer's reference implementation. Should
* produce the same final output as above.
@@ -169,7 +170,7 @@ public final class TestMeanShift extends
printCanopies(canopies);
printImage(canopies);
}
-
+
/**
* Story: User can produce initial canopy centers using a
* EuclideanDistanceMeasure and a CanopyMapper/Combiner which clusters input
@@ -178,7 +179,7 @@ public final class TestMeanShift extends
@Test
public void testCanopyMapperEuclidean() throws Exception {
MeanShiftCanopyClusterer clusterer = new MeanShiftCanopyClusterer(
- euclideanDistanceMeasure, kernelProfile, 4, 1, 0.5);
+ euclideanDistanceMeasure, kernelProfile, 4, 1, 0.5, true);
// get the initial canopies
List<MeanShiftCanopy> canopies = getInitialCanopies();
// build the reference set
@@ -188,7 +189,7 @@ public final class TestMeanShift extends
clusterer.mergeCanopy(new MeanShiftCanopy(aRaw, nextCanopyId++,
euclideanDistanceMeasure), refCanopies);
}
-
+
Configuration conf = new Configuration();
conf.set(MeanShiftCanopyConfigKeys.DISTANCE_MEASURE_KEY,
"org.apache.mahout.common.distance.EuclideanDistanceMeasure");
@@ -197,49 +198,52 @@ public final class TestMeanShift extends
conf.set(MeanShiftCanopyConfigKeys.T1_KEY, "4");
conf.set(MeanShiftCanopyConfigKeys.T2_KEY, "1");
conf.set(MeanShiftCanopyConfigKeys.CLUSTER_CONVERGENCE_KEY, "0.5");
-
+
// map the data
MeanShiftCanopyMapper mapper = new MeanShiftCanopyMapper();
- DummyRecordWriter<Text,MeanShiftCanopy> mapWriter = new
DummyRecordWriter<Text,MeanShiftCanopy>();
- Mapper<WritableComparable<?>,MeanShiftCanopy,Text,MeanShiftCanopy>.Context
mapContext = DummyRecordWriter
+ DummyRecordWriter<Text, MeanShiftCanopy> mapWriter = new
DummyRecordWriter<Text, MeanShiftCanopy>();
+ Mapper<WritableComparable<?>, MeanShiftCanopy, Text,
MeanShiftCanopy>.Context mapContext = DummyRecordWriter
.build(mapper, conf, mapWriter);
mapper.setup(mapContext);
for (MeanShiftCanopy canopy : canopies) {
mapper.map(new Text(), canopy, mapContext);
}
mapper.cleanup(mapContext);
-
+
// now verify the output
assertEquals("Number of map results", 1, mapWriter.getData().size());
- List<MeanShiftCanopy> data = mapWriter.getValue(new Text("canopy"));
+ List<MeanShiftCanopy> data = mapWriter.getValue(new Text("0"));
assertEquals("Number of canopies", refCanopies.size(), data.size());
-
+
// add all points to the reference canopies
- Map<String,MeanShiftCanopy> refCanopyMap = Maps.newHashMap();
+ Map<String, MeanShiftCanopy> refCanopyMap = Maps.newHashMap();
for (MeanShiftCanopy canopy : refCanopies) {
clusterer.shiftToMean(canopy);
refCanopyMap.put(canopy.getIdentifier(), canopy);
}
// build a map of the combiner output
- Map<String,MeanShiftCanopy> canopyMap = Maps.newHashMap();
+ Map<String, MeanShiftCanopy> canopyMap = Maps.newHashMap();
for (MeanShiftCanopy d : data) {
canopyMap.put(d.getIdentifier(), d);
}
// compare the maps
- for (Map.Entry<String,MeanShiftCanopy> stringMeanShiftCanopyEntry :
refCanopyMap
+ for (Map.Entry<String, MeanShiftCanopy> stringMeanShiftCanopyEntry :
refCanopyMap
.entrySet()) {
MeanShiftCanopy ref = stringMeanShiftCanopyEntry.getValue();
-
+
MeanShiftCanopy canopy = canopyMap.get((ref.isConverged() ? "MSV-"
- : "MSC-") + ref.getId());
+ : "MSC-")
+ + ref.getId());
assertEquals("ids", ref.getId(), canopy.getId());
assertEquals("centers(" + ref.getIdentifier() + ')', ref.getCenter()
.asFormatString(), canopy.getCenter().asFormatString());
assertEquals("bound points", ref.getBoundPoints().toList().size(), canopy
.getBoundPoints().toList().size());
+ assertEquals("num bound points", ref.getNumPoints(), canopy
+ .getNumPoints());
}
}
-
+
/**
* Story: User can produce final canopy centers using a
* EuclideanDistanceMeasure and a CanopyReducer which clusters input centroid
@@ -248,7 +252,7 @@ public final class TestMeanShift extends
@Test
public void testCanopyReducerEuclidean() throws Exception {
MeanShiftCanopyClusterer clusterer = new MeanShiftCanopyClusterer(
- euclideanDistanceMeasure, kernelProfile, 4, 1, 0.5);
+ euclideanDistanceMeasure, kernelProfile, 4, 1, 0.5, true);
// get the initial canopies
List<MeanShiftCanopy> canopies = getInitialCanopies();
// build the mapper output reference set
@@ -269,7 +273,7 @@ public final class TestMeanShift extends
for (MeanShiftCanopy canopy : reducerReference) {
clusterer.shiftToMean(canopy);
}
-
+
Configuration conf = new Configuration();
conf.set(MeanShiftCanopyConfigKeys.DISTANCE_MEASURE_KEY,
"org.apache.mahout.common.distance.EuclideanDistanceMeasure");
@@ -279,46 +283,47 @@ public final class TestMeanShift extends
conf.set(MeanShiftCanopyConfigKeys.T2_KEY, "1");
conf.set(MeanShiftCanopyConfigKeys.CLUSTER_CONVERGENCE_KEY, "0.5");
conf.set(MeanShiftCanopyConfigKeys.CONTROL_PATH_KEY, "output/control");
-
+
MeanShiftCanopyMapper mapper = new MeanShiftCanopyMapper();
- DummyRecordWriter<Text,MeanShiftCanopy> mapWriter = new
DummyRecordWriter<Text,MeanShiftCanopy>();
- Mapper<WritableComparable<?>,MeanShiftCanopy,Text,MeanShiftCanopy>.Context
mapContext = DummyRecordWriter
+ DummyRecordWriter<Text, MeanShiftCanopy> mapWriter = new
DummyRecordWriter<Text, MeanShiftCanopy>();
+ Mapper<WritableComparable<?>, MeanShiftCanopy, Text,
MeanShiftCanopy>.Context mapContext = DummyRecordWriter
.build(mapper, conf, mapWriter);
mapper.setup(mapContext);
-
+
// map the data
for (MeanShiftCanopy canopy : canopies) {
mapper.map(new Text(), canopy, mapContext);
}
mapper.cleanup(mapContext);
-
+
assertEquals("Number of map results", 1, mapWriter.getData().size());
// now reduce the mapper output
MeanShiftCanopyReducer reducer = new MeanShiftCanopyReducer();
- DummyRecordWriter<Text,MeanShiftCanopy> reduceWriter = new
DummyRecordWriter<Text,MeanShiftCanopy>();
- Reducer<Text,MeanShiftCanopy,Text,MeanShiftCanopy>.Context reduceContext =
DummyRecordWriter
+ DummyRecordWriter<Text, MeanShiftCanopy> reduceWriter = new
DummyRecordWriter<Text, MeanShiftCanopy>();
+ Reducer<Text, MeanShiftCanopy, Text, MeanShiftCanopy>.Context
reduceContext = DummyRecordWriter
.build(reducer, conf, reduceWriter, Text.class, MeanShiftCanopy.class);
reducer.setup(reduceContext);
- reducer.reduce(new Text("canopy"), mapWriter.getValue(new Text("canopy")),
+ reducer.reduce(new Text("0"), mapWriter.getValue(new Text("0")),
reduceContext);
reducer.cleanup(reduceContext);
-
+
// now verify the output
assertEquals("Number of canopies", reducerReference.size(), reduceWriter
.getKeys().size());
-
+
// add all points to the reference canopy maps
- Map<String,MeanShiftCanopy> reducerReferenceMap = Maps.newHashMap();
+ Map<String, MeanShiftCanopy> reducerReferenceMap = Maps.newHashMap();
for (MeanShiftCanopy canopy : reducerReference) {
reducerReferenceMap.put(canopy.getIdentifier(), canopy);
}
// compare the maps
- for (Map.Entry<String,MeanShiftCanopy> mapEntry : reducerReferenceMap
+ for (Map.Entry<String, MeanShiftCanopy> mapEntry : reducerReferenceMap
.entrySet()) {
MeanShiftCanopy refCanopy = mapEntry.getValue();
-
+
List<MeanShiftCanopy> values = reduceWriter.getValue(new Text((refCanopy
- .isConverged() ? "MSV-" : "MSC-") + refCanopy.getId()));
+ .isConverged() ? "MSV-" : "MSC-")
+ + refCanopy.getId()));
assertEquals("values", 1, values.size());
MeanShiftCanopy reducerCanopy = values.get(0);
assertEquals("ids", refCanopy.getId(), reducerCanopy.getId());
@@ -331,9 +336,11 @@ public final class TestMeanShift extends
reducerCenter);
assertEquals("bound points", refCanopy.getBoundPoints().toList().size(),
reducerCanopy.getBoundPoints().toList().size());
+ assertEquals("num bound points", refCanopy.getNumPoints(), reducerCanopy
+ .getNumPoints());
}
}
-
+
/**
* Story: User can produce final point clustering using a Hadoop map/reduce
* job and a EuclideanDistanceMeasure.
@@ -356,7 +363,7 @@ public final class TestMeanShift extends
Path output = getTestTempDirPath("output");
// MeanShiftCanopyDriver.runJob(input, output,
// EuclideanDistanceMeasure.class.getName(), 4, 1, 0.5, 10, false, false);
- String[] args = {optKey(DefaultOptionCreator.INPUT_OPTION),
+ String[] args = { optKey(DefaultOptionCreator.INPUT_OPTION),
getTestTempDirPath("testdata").toString(),
optKey(DefaultOptionCreator.OUTPUT_OPTION), output.toString(),
optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION),
@@ -368,7 +375,7 @@ public final class TestMeanShift extends
optKey(DefaultOptionCreator.CLUSTERING_OPTION),
optKey(DefaultOptionCreator.MAX_ITERATIONS_OPTION), "7",
optKey(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION), "0.2",
- optKey(DefaultOptionCreator.OVERWRITE_OPTION)};
+ optKey(DefaultOptionCreator.OVERWRITE_OPTION) };
ToolRunner.run(conf, new MeanShiftCanopyDriver(), args);
Path outPart = new Path(output, "clusters-4/part-r-00000");
long count = HadoopUtil.countRecords(outPart, conf);
@@ -379,11 +386,12 @@ public final class TestMeanShift extends
// now test the initial clusters to ensure the type of their centers has
// been retained
while (iterator.hasNext()) {
- Cluster canopy = (Cluster) iterator.next();
+ MeanShiftCanopy canopy = (MeanShiftCanopy) iterator.next();
assertTrue(canopy.getCenter() instanceof DenseVector);
+ assertFalse(canopy.getBoundPoints().isEmpty());
}
}
-
+
/**
* Story: User can produce final point clustering using a Hadoop map/reduce
* job and a EuclideanDistanceMeasure.
@@ -407,7 +415,7 @@ public final class TestMeanShift extends
System.out.println("Output Path: " + output);
// MeanShiftCanopyDriver.runJob(input, output,
// EuclideanDistanceMeasure.class.getName(), 4, 1, 0.5, 10, false, false);
- String[] args = {optKey(DefaultOptionCreator.INPUT_OPTION),
+ String[] args = { optKey(DefaultOptionCreator.INPUT_OPTION),
getTestTempDirPath("testdata").toString(),
optKey(DefaultOptionCreator.OUTPUT_OPTION), output.toString(),
optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION),
@@ -421,10 +429,107 @@ public final class TestMeanShift extends
optKey(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION), "0.2",
optKey(DefaultOptionCreator.OVERWRITE_OPTION),
optKey(DefaultOptionCreator.METHOD_OPTION),
- DefaultOptionCreator.SEQUENTIAL_METHOD};
+ DefaultOptionCreator.SEQUENTIAL_METHOD };
+ ToolRunner.run(new Configuration(), new MeanShiftCanopyDriver(), args);
+ Path outPart = new Path(output, "clusters-7/part-r-00000");
+ long count = HadoopUtil.countRecords(outPart, conf);
+ assertEquals("count", 3, count);
+ }
+
+ /**
+ * Story: User can produce final point clustering using a Hadoop map/reduce
+ * job and a EuclideanDistanceMeasure.
+ */
+ @Test
+ public void testCanopyEuclideanMRJobNoClustering() throws Exception {
+ Path input = getTestTempDirPath("testdata");
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.get(input.toUri(), conf);
+ Collection<VectorWritable> points = Lists.newArrayList();
+ for (Vector v : raw) {
+ points.add(new VectorWritable(v));
+ }
+ ClusteringTestUtils.writePointsToFile(points,
+ getTestTempFilePath("testdata/file1"), fs, conf);
+ ClusteringTestUtils.writePointsToFile(points,
+ getTestTempFilePath("testdata/file2"), fs, conf);
+ // now run the Job using the run() command. Other tests can continue to use
+ // runJob().
+ Path output = getTestTempDirPath("output");
+ // MeanShiftCanopyDriver.runJob(input, output,
+ // EuclideanDistanceMeasure.class.getName(), 4, 1, 0.5, 10, false, false);
+ String[] args = { optKey(DefaultOptionCreator.INPUT_OPTION),
+ getTestTempDirPath("testdata").toString(),
+ optKey(DefaultOptionCreator.OUTPUT_OPTION), output.toString(),
+ optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION),
+ EuclideanDistanceMeasure.class.getName(),
+ optKey(DefaultOptionCreator.KERNEL_PROFILE_OPTION),
+ TriangularKernelProfile.class.getName(),
+ optKey(DefaultOptionCreator.T1_OPTION), "4",
+ optKey(DefaultOptionCreator.T2_OPTION), "1",
+ optKey(DefaultOptionCreator.MAX_ITERATIONS_OPTION), "7",
+ optKey(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION), "0.2",
+ optKey(DefaultOptionCreator.OVERWRITE_OPTION) };
+ ToolRunner.run(conf, new MeanShiftCanopyDriver(), args);
+ Path outPart = new Path(output, "clusters-3/part-r-00000");
+ long count = HadoopUtil.countRecords(outPart, conf);
+ assertEquals("count", 3, count);
+ Iterator<?> iterator = new SequenceFileValueIterator<Writable>(outPart,
+ true, conf);
+ while (iterator.hasNext()) {
+ MeanShiftCanopy canopy = (MeanShiftCanopy) iterator.next();
+ assertTrue(canopy.getCenter() instanceof DenseVector);
+ assertEquals(1, canopy.getBoundPoints().size());
+ }
+ }
+
+ /**
+ * Story: User can produce final point clustering using a Hadoop map/reduce
+ * job and a EuclideanDistanceMeasure.
+ */
+ @Test
+ public void testCanopyEuclideanSeqJobNoClustering() throws Exception {
+ Path input = getTestTempDirPath("testdata");
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.get(input.toUri(), conf);
+ Collection<VectorWritable> points = Lists.newArrayList();
+ for (Vector v : raw) {
+ points.add(new VectorWritable(v));
+ }
+ ClusteringTestUtils.writePointsToFile(points,
+ getTestTempFilePath("testdata/file1"), fs, conf);
+ ClusteringTestUtils.writePointsToFile(points,
+ getTestTempFilePath("testdata/file2"), fs, conf);
+ // now run the Job using the run() command. Other tests can continue to use
+ // runJob().
+ Path output = getTestTempDirPath("output");
+ System.out.println("Output Path: " + output);
+ // MeanShiftCanopyDriver.runJob(input, output,
+ // EuclideanDistanceMeasure.class.getName(), 4, 1, 0.5, 10, false, false);
+ String[] args = { optKey(DefaultOptionCreator.INPUT_OPTION),
+ getTestTempDirPath("testdata").toString(),
+ optKey(DefaultOptionCreator.OUTPUT_OPTION), output.toString(),
+ optKey(DefaultOptionCreator.DISTANCE_MEASURE_OPTION),
+ EuclideanDistanceMeasure.class.getName(),
+ optKey(DefaultOptionCreator.KERNEL_PROFILE_OPTION),
+ TriangularKernelProfile.class.getName(),
+ optKey(DefaultOptionCreator.T1_OPTION), "4",
+ optKey(DefaultOptionCreator.T2_OPTION), "1",
+ optKey(DefaultOptionCreator.MAX_ITERATIONS_OPTION), "7",
+ optKey(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION), "0.2",
+ optKey(DefaultOptionCreator.OVERWRITE_OPTION),
+ optKey(DefaultOptionCreator.METHOD_OPTION),
+ DefaultOptionCreator.SEQUENTIAL_METHOD };
ToolRunner.run(new Configuration(), new MeanShiftCanopyDriver(), args);
Path outPart = new Path(output, "clusters-7/part-r-00000");
long count = HadoopUtil.countRecords(outPart, conf);
assertEquals("count", 3, count);
+ Iterator<?> iterator = new SequenceFileValueIterator<Writable>(outPart,
+ true, conf);
+ while (iterator.hasNext()) {
+ MeanShiftCanopy canopy = (MeanShiftCanopy) iterator.next();
+ assertTrue(canopy.getCenter() instanceof DenseVector);
+ assertEquals(1, canopy.getBoundPoints().size());
+ }
}
}
Modified:
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayMeanShift.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayMeanShift.java?rev=1149369&r1=1149368&r2=1149369&view=diff
==============================================================================
---
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayMeanShift.java
(original)
+++
mahout/trunk/examples/src/main/java/org/apache/mahout/clustering/display/DisplayMeanShift.java
Thu Jul 21 21:13:02 2011
@@ -72,7 +72,7 @@ public class DisplayMeanShift extends Di
int i = 0;
for (Cluster cluster : CLUSTERS.get(CLUSTERS.size() - 1)) {
MeanShiftCanopy canopy = (MeanShiftCanopy) cluster;
- if (canopy.getBoundPoints().toList().size() >= significance
+ if (canopy.getMass() >= significance
* DisplayClustering.SAMPLE_DATA.size()) {
g2.setColor(COLORS[Math.min(i++, DisplayClustering.COLORS.length -
1)]);
int count = 0;