Author: srowen
Date: Sun Apr 17 15:09:46 2011
New Revision: 1094154
URL: http://svn.apache.org/viewvc?rev=1094154&view=rev
Log:
Re-do some recent iterator-related changes that may have been clobbered in a
merge. Fix an apparent typo bug in buildClusters() that would pass output as
both input and output?
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java?rev=1094154&r1=1094153&r2=1094154&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java
Sun Apr 17 15:09:46 2011
@@ -30,7 +30,6 @@ import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.OutputLogFilter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
@@ -47,7 +46,6 @@ import org.apache.mahout.common.distance
import org.apache.mahout.common.iterator.sequencefile.PathFilters;
import org.apache.mahout.common.iterator.sequencefile.PathType;
import
org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterable;
-import
org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterable;
import org.apache.mahout.math.VectorWritable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -91,20 +89,20 @@ public class CanopyDriver extends Abstra
double t1 = Double.parseDouble(getOption(DefaultOptionCreator.T1_OPTION));
double t2 = Double.parseDouble(getOption(DefaultOptionCreator.T2_OPTION));
double t3 = t1;
- if (hasOption(DefaultOptionCreator.T3_OPTION))
+ if (hasOption(DefaultOptionCreator.T3_OPTION)) {
t3 = Double.parseDouble(getOption(DefaultOptionCreator.T3_OPTION));
+ }
double t4 = t2;
- if (hasOption(DefaultOptionCreator.T4_OPTION))
+ if (hasOption(DefaultOptionCreator.T4_OPTION)) {
t4 = Double.parseDouble(getOption(DefaultOptionCreator.T4_OPTION));
+ }
boolean runClustering = hasOption(DefaultOptionCreator.CLUSTERING_OPTION);
- boolean runSequential = getOption(DefaultOptionCreator.METHOD_OPTION)
- .equalsIgnoreCase(DefaultOptionCreator.SEQUENTIAL_METHOD);
+ boolean runSequential =
getOption(DefaultOptionCreator.METHOD_OPTION).equalsIgnoreCase(
+ DefaultOptionCreator.SEQUENTIAL_METHOD);
ClassLoader ccl = Thread.currentThread().getContextClassLoader();
- DistanceMeasure measure = ccl.loadClass(measureClass).asSubclass(
- DistanceMeasure.class).newInstance();
+ DistanceMeasure measure =
ccl.loadClass(measureClass).asSubclass(DistanceMeasure.class).newInstance();
- run(conf, input, output, measure, t1, t2, t3, t4, runClustering,
- runSequential);
+ run(conf, input, output, measure, t1, t2, t3, t4, runClustering,
runSequential);
return 0;
}
@@ -132,34 +130,37 @@ public class CanopyDriver extends Abstra
* cluster the input vectors if true
* @param runSequential
* execute sequentially if true
- * @throws IOException
- * @throws InterruptedException
- * @throws ClassNotFoundException
- * @throws InstantiationException
- * @throws IllegalAccessException
*/
- public static void run(Configuration conf, Path input, Path output,
- DistanceMeasure measure, double t1, double t2, double t3, double t4,
- boolean runClustering, boolean runSequential) throws IOException,
- InterruptedException, ClassNotFoundException, InstantiationException,
- IllegalAccessException {
- Path clustersOut = buildClusters(conf, input, output, measure, t1, t2, t3,
- t4, runSequential);
+ public static void run(Configuration conf,
+ Path input,
+ Path output,
+ DistanceMeasure measure,
+ double t1,
+ double t2,
+ double t3,
+ double t4,
+ boolean runClustering,
+ boolean runSequential)
+ throws IOException, InterruptedException, ClassNotFoundException,
InstantiationException, IllegalAccessException {
+ Path clustersOut = buildClusters(conf, input, output, measure, t1, t2, t3,
t4, runSequential);
if (runClustering) {
- clusterData(conf, input, clustersOut, output, measure, t1, t2,
- runSequential);
+ clusterData(conf, input, clustersOut, output, measure, t1, t2,
runSequential);
}
}
/**
* Convenience method to provide backward compatibility
*/
- public static void run(Configuration conf, Path input, Path output,
- DistanceMeasure measure, double t1, double t2, boolean runClustering,
- boolean runSequential) throws IOException, InterruptedException,
- ClassNotFoundException, InstantiationException, IllegalAccessException {
- run(conf, input, output, measure, t1, t2, t1, t2, runClustering,
- runSequential);
+ public static void run(Configuration conf,
+ Path input,
+ Path output,
+ DistanceMeasure measure,
+ double t1,
+ double t2,
+ boolean runClustering,
+ boolean runSequential)
+ throws IOException, InterruptedException, ClassNotFoundException,
InstantiationException, IllegalAccessException {
+ run(conf, input, output, measure, t1, t2, t1, t2, runClustering,
runSequential);
}
/**
@@ -180,23 +181,29 @@ public class CanopyDriver extends Abstra
* @param runSequential
* execute sequentially if true
*/
- public static void run(Path input, Path output, DistanceMeasure measure,
- double t1, double t2, boolean runClustering, boolean runSequential)
- throws IOException, InterruptedException, ClassNotFoundException,
- InstantiationException, IllegalAccessException {
- run(new Configuration(), input, output, measure, t1, t2, runClustering,
- runSequential);
+ public static void run(Path input,
+ Path output,
+ DistanceMeasure measure,
+ double t1,
+ double t2,
+ boolean runClustering,
+ boolean runSequential)
+ throws IOException, InterruptedException, ClassNotFoundException,
InstantiationException, IllegalAccessException {
+ run(new Configuration(), input, output, measure, t1, t2, runClustering,
runSequential);
}
/**
* Convenience method for backwards compatibility
*/
- public static Path buildClusters(Configuration conf, Path input, Path output,
- DistanceMeasure measure, double t1, double t2, boolean runSequential)
- throws InstantiationException, IllegalAccessException, IOException,
- InterruptedException, ClassNotFoundException {
- return buildClusters(conf, output, output, measure, t1, t2, t1, t2,
- runSequential);
+ public static Path buildClusters(Configuration conf,
+ Path input,
+ Path output,
+ DistanceMeasure measure,
+ double t1,
+ double t2,
+ boolean runSequential)
+ throws IOException, InterruptedException, ClassNotFoundException {
+ return buildClusters(conf, input, output, measure, t1, t2, t1, t2,
runSequential);
}
/**
@@ -223,13 +230,18 @@ public class CanopyDriver extends Abstra
* a boolean indicates to run the sequential (reference) algorithm
* @return the canopy output directory Path
*/
- public static Path buildClusters(Configuration conf, Path input, Path output,
- DistanceMeasure measure, double t1, double t2, double t3, double t4,
- boolean runSequential) throws InstantiationException,
- IllegalAccessException, IOException, InterruptedException,
- ClassNotFoundException {
- log.info("Build Clusters Input: {} Out: {} " + "Measure: {} t1: {} t2: {}",
- new Object[] { input, output, measure, t1, t2 });
+ public static Path buildClusters(Configuration conf,
+ Path input,
+ Path output,
+ DistanceMeasure measure,
+ double t1,
+ double t2,
+ double t3,
+ double t4,
+ boolean runSequential)
+ throws IOException, InterruptedException, ClassNotFoundException {
+ log.info("Build Clusters Input: {} Out: {} Measure: {} t1: {} t2: {}",
+ new Object[] {input, output, measure, t1, t2});
if (runSequential) {
return buildClustersSeq(input, output, measure, t1, t2);
} else {
@@ -253,42 +265,34 @@ public class CanopyDriver extends Abstra
* the double T2 distance metric
* @return the canopy output directory Path
*/
- private static Path buildClustersSeq(Path input, Path output,
- DistanceMeasure measure, double t1, double t2)
- throws InstantiationException, IllegalAccessException, IOException {
+ private static Path buildClustersSeq(Path input,
+ Path output,
+ DistanceMeasure measure,
+ double t1,
+ double t2) throws IOException {
CanopyClusterer clusterer = new CanopyClusterer(measure, t1, t2);
Collection<Canopy> canopies = new ArrayList<Canopy>();
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(input.toUri(), conf);
- FileStatus[] status = fs.listStatus(input, new OutputLogFilter());
- for (FileStatus s : status) {
- SequenceFile.Reader reader = new SequenceFile.Reader(fs, s.getPath(),
- conf);
- try {
- Writable key = reader.getKeyClass().asSubclass(Writable.class)
- .newInstance();
- VectorWritable vw = reader.getValueClass().asSubclass(
- VectorWritable.class).newInstance();
- while (reader.next(key, vw)) {
- clusterer.addPointToCanopies(vw.get(), canopies);
- vw = reader.getValueClass().asSubclass(VectorWritable.class)
- .newInstance();
- }
- } finally {
- reader.close();
- }
+
+ for (VectorWritable vw
+ : new SequenceFileDirValueIterable<VectorWritable>(input,
PathType.LIST, PathFilters.logsCRCFilter(), conf)) {
+ clusterer.addPointToCanopies(vw.get(), canopies);
}
+
Path canopyOutputDir = new Path(output, Cluster.CLUSTERS_DIR + '0');
Path path = new Path(canopyOutputDir, "part-r-00000");
- SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path,
- Text.class, Canopy.class);
+ SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path,
Text.class, Canopy.class);
try {
for (Canopy canopy : canopies) {
canopy.computeParameters();
- log.debug("Writing Canopy:" + canopy.getIdentifier() + " center:"
- + AbstractCluster.formatVector(canopy.getCenter(), null)
- + " numPoints:" + canopy.getNumPoints() + " radius:"
- + AbstractCluster.formatVector(canopy.getRadius(), null));
+ log.debug("Writing Canopy:{} center:{} numPoints:{} radius:{}",
+ new Object[] {
+ canopy.getIdentifier(),
+ AbstractCluster.formatVector(canopy.getCenter(), null),
+ canopy.getNumPoints(),
+ AbstractCluster.formatVector(canopy.getRadius(), null)
+ });
writer.append(new Text(canopy.getIdentifier()), canopy);
}
} finally {
@@ -319,19 +323,22 @@ public class CanopyDriver extends Abstra
*
* @return the canopy output directory Path
*/
- private static Path buildClustersMR(Configuration conf, Path input,
- Path output, DistanceMeasure measure, double t1, double t2, double t3,
- double t4) throws IOException, InterruptedException,
- ClassNotFoundException {
- conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY, measure.getClass()
- .getName());
+ private static Path buildClustersMR(Configuration conf,
+ Path input,
+ Path output,
+ DistanceMeasure measure,
+ double t1,
+ double t2,
+ double t3,
+ double t4)
+ throws IOException, InterruptedException, ClassNotFoundException {
+ conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY,
measure.getClass().getName());
conf.set(CanopyConfigKeys.T1_KEY, String.valueOf(t1));
conf.set(CanopyConfigKeys.T2_KEY, String.valueOf(t2));
conf.set(CanopyConfigKeys.T3_KEY, String.valueOf(t3));
conf.set(CanopyConfigKeys.T4_KEY, String.valueOf(t4));
- Job job = new Job(conf, "Canopy Driver running buildClusters over input: "
- + input);
+ Job job = new Job(conf, "Canopy Driver running buildClusters over input: "
+ input);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setMapperClass(CanopyMapper.class);
@@ -347,17 +354,20 @@ public class CanopyDriver extends Abstra
Path canopyOutputDir = new Path(output, Cluster.CLUSTERS_DIR + '0');
FileOutputFormat.setOutputPath(job, canopyOutputDir);
if (!job.waitForCompletion(true)) {
- throw new InterruptedException("Canopy Job failed processing "
- + input.toString());
+ throw new InterruptedException("Canopy Job failed processing " + input);
}
return canopyOutputDir;
}
- public static void clusterData(Configuration conf, Path points,
- Path canopies, Path output, DistanceMeasure measure, double t1,
- double t2, boolean runSequential) throws InstantiationException,
- IllegalAccessException, IOException, InterruptedException,
- ClassNotFoundException {
+ public static void clusterData(Configuration conf,
+ Path points,
+ Path canopies,
+ Path output,
+ DistanceMeasure measure,
+ double t1,
+ double t2,
+ boolean runSequential)
+ throws InstantiationException, IllegalAccessException, IOException,
InterruptedException, ClassNotFoundException {
if (runSequential) {
clusterDataSeq(points, canopies, output, measure, t1, t2);
} else {
@@ -365,35 +375,27 @@ public class CanopyDriver extends Abstra
}
}
- private static void clusterDataSeq(Path points, Path canopies, Path output,
- DistanceMeasure measure, double t1, double t2)
- throws InstantiationException, IllegalAccessException, IOException {
+ private static void clusterDataSeq(Path points,
+ Path canopies,
+ Path output,
+ DistanceMeasure measure,
+ double t1,
+ double t2)
+ throws InstantiationException, IllegalAccessException, IOException {
CanopyClusterer clusterer = new CanopyClusterer(measure, t1, t2);
Collection<Canopy> clusters = new ArrayList<Canopy>();
Configuration conf = new Configuration();
- FileSystem fs = FileSystem.get(canopies.toUri(), conf);
- FileStatus[] status = fs.listStatus(canopies, new OutputLogFilter());
- for (FileStatus s : status) {
- SequenceFile.Reader reader = new SequenceFile.Reader(fs, s.getPath(),
- conf);
- try {
- Writable key = reader.getKeyClass().asSubclass(Writable.class)
- .newInstance();
- Canopy value = reader.getValueClass().asSubclass(Canopy.class)
- .newInstance();
- while (reader.next(key, value)) {
- clusters.add(value);
- value =
reader.getValueClass().asSubclass(Canopy.class).newInstance();
- }
- } finally {
- reader.close();
- }
+
+ for (Canopy value
+ : new SequenceFileDirValueIterable<Canopy>(canopies, PathType.LIST,
PathFilters.logsCRCFilter(), conf)) {
+ clusters.add(value);
}
+
// iterate over all points, assigning each to the closest canopy and
// outputing that clustering
- fs = FileSystem.get(points.toUri(), conf);
- status = fs.listStatus(points, new OutputLogFilter());
+ FileSystem fs = FileSystem.get(points.toUri(), conf);
+ FileStatus[] status = fs.listStatus(points, PathFilters.logsCRCFilter());
Path outPath = new Path(output, DEFAULT_CLUSTERED_POINTS_DIRECTORY);
int part = 0;
for (FileStatus s : status) {
@@ -421,17 +423,20 @@ public class CanopyDriver extends Abstra
}
}
- private static void clusterDataMR(Configuration conf, Path points,
- Path canopies, Path output, DistanceMeasure measure, double t1, double
t2)
- throws IOException, InterruptedException, ClassNotFoundException {
- conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY, measure.getClass()
- .getName());
+ private static void clusterDataMR(Configuration conf,
+ Path points,
+ Path canopies,
+ Path output,
+ DistanceMeasure measure,
+ double t1,
+ double t2)
+ throws IOException, InterruptedException, ClassNotFoundException {
+ conf.set(CanopyConfigKeys.DISTANCE_MEASURE_KEY,
measure.getClass().getName());
conf.set(CanopyConfigKeys.T1_KEY, String.valueOf(t1));
conf.set(CanopyConfigKeys.T2_KEY, String.valueOf(t2));
conf.set(CanopyConfigKeys.CANOPY_PATH_KEY, canopies.toString());
- Job job = new Job(conf, "Canopy Driver running clusterData over input: "
- + points);
+ Job job = new Job(conf, "Canopy Driver running clusterData over input: " +
points);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setMapperClass(ClusterMapper.class);
@@ -446,8 +451,7 @@ public class CanopyDriver extends Abstra
HadoopUtil.delete(conf, outPath);
if (!job.waitForCompletion(true)) {
- throw new InterruptedException("Canopy Clustering failed processing "
- + canopies.toString());
+ throw new InterruptedException("Canopy Clustering failed processing " +
canopies);
}
}