Author: jeastman
Date: Fri Oct 8 19:01:34 2010
New Revision: 1005958
URL: http://svn.apache.org/viewvc?rev=1005958&view=rev
Log:
MAHOUT-504:
- Added job completion tests to break out of iterations if errors occur
- Fixed canopy cluster mapper initialization problem with _log files on Hadoop
- All synthetic control examples run on Hadoop cluster
- All unit tests run
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/ClusterMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletDriver.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/lda/LDADriver.java
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyDriver.java
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java?rev=1005958&r1=1005957&r2=1005958&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java
Fri Oct 8 19:01:34 2010
@@ -248,7 +248,9 @@ public class CanopyDriver extends Abstra
FileInputFormat.addInputPath(job, input);
Path canopyOutputDir = new Path(output, Cluster.CLUSTERS_DIR + '0');
FileOutputFormat.setOutputPath(job, canopyOutputDir);
- job.waitForCompletion(true);
+ if (job.waitForCompletion(true) == false) {
+ throw new InterruptedException("Canopy Job failed processing " +
input.toString());
+ }
return canopyOutputDir;
}
@@ -347,7 +349,9 @@ public class CanopyDriver extends Abstra
FileOutputFormat.setOutputPath(job, outPath);
HadoopUtil.overwriteOutput(outPath);
- job.waitForCompletion(true);
+ if (job.waitForCompletion(true) == false) {
+ throw new InterruptedException("Canopy Clustering failed processing " +
canopies.toString());
+ }
}
}
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/ClusterMapper.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/ClusterMapper.java?rev=1005958&r1=1005957&r2=1005958&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/ClusterMapper.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/ClusterMapper.java
Fri Oct 8 19:01:34 2010
@@ -25,7 +25,9 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
@@ -63,16 +65,23 @@ public class ClusterMapper extends Mappe
canopyClusterer = new CanopyClusterer(context.getConfiguration());
- Configuration configuration = context.getConfiguration();
- String canopyPath = configuration.get(CanopyConfigKeys.CANOPY_PATH_KEY);
+ Configuration conf = context.getConfiguration();
+ String clustersIn = conf.get(CanopyConfigKeys.CANOPY_PATH_KEY);
- if ((canopyPath != null) && (canopyPath.length() > 0)) {
+ // filter out the files
+ PathFilter clusterFileFilter = new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ return path.getName().startsWith("part");
+ }
+ };
+ if ((clustersIn != null) && (clustersIn.length() > 0)) {
try {
- Path path = new Path(canopyPath);
- FileSystem fs = FileSystem.get(path.toUri(), configuration);
- FileStatus[] files = fs.listStatus(path);
+ Path clusterPath = new Path(clustersIn,"*");
+ FileSystem fs = clusterPath.getFileSystem(conf);
+ FileStatus[] files =
fs.listStatus(FileUtil.stat2Paths(fs.globStatus(clusterPath,
clusterFileFilter)), clusterFileFilter);
for (FileStatus file : files) {
- SequenceFile.Reader reader = new SequenceFile.Reader(fs,
file.getPath(), configuration);
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs,
file.getPath(), conf);
try {
Text key = new Text();
Canopy value = new Canopy();
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletDriver.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletDriver.java?rev=1005958&r1=1005957&r2=1005958&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletDriver.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletDriver.java
Fri Oct 8 19:01:34 2010
@@ -372,7 +372,9 @@ public class DirichletDriver extends Abs
FileInputFormat.addInputPath(job, input);
FileOutputFormat.setOutputPath(job, stateOut);
- job.waitForCompletion(true);
+ if (job.waitForCompletion(true) == false) {
+ throw new InterruptedException("Dirichlet Iteration failed processing "
+ stateIn.toString());
+ }
}
/**
@@ -561,6 +563,8 @@ public class DirichletDriver extends Abs
FileInputFormat.addInputPath(job, input);
FileOutputFormat.setOutputPath(job, output);
- job.waitForCompletion(true);
+ if (job.waitForCompletion(true) == false) {
+ throw new InterruptedException("Dirichlet Clustering failed processing "
+ stateIn.toString());
+ }
}
}
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java?rev=1005958&r1=1005957&r2=1005958&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java
Fri Oct 8 19:01:34 2010
@@ -71,8 +71,7 @@ public class FuzzyKMeansDriver extends A
addOption(DefaultOptionCreator.distanceMeasureOption().create());
addOption(DefaultOptionCreator.clustersInOption()
.withDescription("The input centroids, as Vectors. Must be a
SequenceFile of Writable, Cluster/Canopy. "
- + "If k is also specified, then a random set of vectors will be
selected"
- + " and written out to this path first")
+ + "If k is also specified, then a random set of vectors will be
selected" + " and written out to this path first")
.create());
addOption(DefaultOptionCreator.numClustersOption()
.withDescription("The k in k-Means. If specified, then a random
selection of k Vectors will be chosen"
@@ -114,8 +113,7 @@ public class FuzzyKMeansDriver extends A
.get(DefaultOptionCreator.NUM_CLUSTERS_OPTION)), measure);
}
boolean runClustering = hasOption(DefaultOptionCreator.CLUSTERING_OPTION);
- boolean runSequential =
-
getOption(DefaultOptionCreator.METHOD_OPTION).equalsIgnoreCase(DefaultOptionCreator.SEQUENTIAL_METHOD);
+ boolean runSequential =
getOption(DefaultOptionCreator.METHOD_OPTION).equalsIgnoreCase(DefaultOptionCreator.SEQUENTIAL_METHOD);
run(getConf(),
input,
clusters,
@@ -166,8 +164,8 @@ public class FuzzyKMeansDriver extends A
boolean runClustering,
boolean emitMostLikely,
double threshold,
- boolean runSequential)
- throws IOException, ClassNotFoundException, InterruptedException,
InstantiationException, IllegalAccessException {
+ boolean runSequential) throws IOException,
ClassNotFoundException, InterruptedException,
+ InstantiationException, IllegalAccessException {
Path clustersOut = buildClusters(new Configuration(),
input,
clustersIn,
@@ -209,6 +207,8 @@ public class FuzzyKMeansDriver extends A
*
http://en.wikipedia.org/wiki/Data_clustering#Fuzzy_c-means_clustering
*
* @return true if the iteration successfully runs
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
*/
private static boolean runIteration(Configuration conf,
Path input,
@@ -216,7 +216,7 @@ public class FuzzyKMeansDriver extends A
Path clustersOut,
String measureClass,
double convergenceDelta,
- float m) throws IOException {
+ float m) throws IOException,
InterruptedException, ClassNotFoundException {
conf.set(FuzzyKMeansConfigKeys.CLUSTER_PATH_KEY, clustersIn.toString());
conf.set(FuzzyKMeansConfigKeys.DISTANCE_MEASURE_KEY, measureClass);
@@ -242,20 +242,11 @@ public class FuzzyKMeansDriver extends A
FileInputFormat.addInputPath(job, input);
FileOutputFormat.setOutputPath(job, clustersOut);
- try {
- job.waitForCompletion(true);
- FileSystem fs = FileSystem.get(clustersOut.toUri(), conf);
- return isConverged(clustersOut, conf, fs);
- } catch (IOException e) {
- log.warn(e.toString(), e);
- return true;
- } catch (InterruptedException e) {
- log.warn(e.toString(), e);
- return true;
- } catch (ClassNotFoundException e) {
- log.warn(e.toString(), e);
- return true;
+ if (job.waitForCompletion(true) == false) {
+ throw new InterruptedException("Fuzzy K-Means Iteration failed
processing " + clustersIn.toString());
}
+ FileSystem fs = FileSystem.get(clustersOut.toUri(), conf);
+ return isConverged(clustersOut, conf, fs);
}
/**
@@ -293,10 +284,9 @@ public class FuzzyKMeansDriver extends A
boolean runClustering,
boolean emitMostLikely,
double threshold,
- boolean runSequential)
- throws IOException, ClassNotFoundException, InterruptedException,
InstantiationException, IllegalAccessException {
- Path clustersOut =
- buildClusters(conf, input, clustersIn, output, measure,
convergenceDelta, maxIterations, m, runSequential);
+ boolean runSequential) throws IOException,
ClassNotFoundException, InterruptedException,
+ InstantiationException, IllegalAccessException {
+ Path clustersOut = buildClusters(conf, input, clustersIn, output, measure,
convergenceDelta, maxIterations, m, runSequential);
if (runClustering) {
log.info("Clustering ");
clusterData(input,
@@ -332,6 +322,8 @@ public class FuzzyKMeansDriver extends A
* @param runSequential if true run in sequential execution mode
*
* @return the Path of the final clusters directory
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
*/
public static Path buildClusters(Configuration conf,
Path input,
@@ -341,8 +333,7 @@ public class FuzzyKMeansDriver extends A
double convergenceDelta,
int maxIterations,
float m,
- boolean runSequential)
- throws IOException, InstantiationException, IllegalAccessException {
+ boolean runSequential) throws IOException,
InstantiationException, IllegalAccessException, InterruptedException,
ClassNotFoundException {
if (runSequential) {
return buildClustersSeq(input, clustersIn, output, measure,
convergenceDelta, maxIterations, m);
} else {
@@ -414,7 +405,7 @@ public class FuzzyKMeansDriver extends A
DistanceMeasure measure,
double convergenceDelta,
int maxIterations,
- float m) throws IOException {
+ float m) throws IOException,
InterruptedException, ClassNotFoundException {
boolean converged = false;
int iteration = 1;
@@ -460,8 +451,8 @@ public class FuzzyKMeansDriver extends A
float m,
boolean emitMostLikely,
double threshold,
- boolean runSequential)
- throws IOException, ClassNotFoundException, InterruptedException,
InstantiationException, IllegalAccessException {
+ boolean runSequential) throws IOException,
ClassNotFoundException, InterruptedException,
+ InstantiationException, IllegalAccessException {
if (runSequential) {
clusterDataSeq(input, clustersIn, output, measure, convergenceDelta, m);
} else {
@@ -540,7 +531,9 @@ public class FuzzyKMeansDriver extends A
job.setNumReduceTasks(0);
job.setJarByClass(FuzzyKMeansDriver.class);
- job.waitForCompletion(true);
+ if (job.waitForCompletion(true) == false) {
+ throw new InterruptedException("Fuzzy K-Means Clustering failed
processing " + clustersIn.toString());
+ }
}
/**
@@ -568,8 +561,7 @@ public class FuzzyKMeansDriver extends A
}
};
- FileStatus[] matches =
- fs.listStatus(FileUtil.stat2Paths(fs.globStatus(clusterPath,
clusterFileFilter)), clusterFileFilter);
+ FileStatus[] matches =
fs.listStatus(FileUtil.stat2Paths(fs.globStatus(clusterPath,
clusterFileFilter)), clusterFileFilter);
for (FileStatus match : matches) {
result.add(fs.makeQualified(match.getPath()));
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java?rev=1005958&r1=1005957&r2=1005958&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java
Fri Oct 8 19:01:34 2010
@@ -339,7 +339,9 @@ public class KMeansDriver extends Abstra
job.setJarByClass(KMeansDriver.class);
HadoopUtil.overwriteOutput(clustersOut);
- job.waitForCompletion(true);
+ if (job.waitForCompletion(true) == false) {
+ throw new InterruptedException("K-Means Iteration failed processing " +
clustersIn.toString());
+ }
FileSystem fs = FileSystem.get(clustersOut.toUri(), conf);
return isConverged(clustersOut, conf, fs);
@@ -482,6 +484,8 @@ public class KMeansDriver extends Abstra
job.setNumReduceTasks(0);
job.setJarByClass(KMeansDriver.class);
- job.waitForCompletion(true);
+ if (job.waitForCompletion(true) == false) {
+ throw new InterruptedException("K-Means Clustering failed processing " +
clustersIn.toString());
+ }
}
}
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/lda/LDADriver.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/lda/LDADriver.java?rev=1005958&r1=1005957&r2=1005958&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/lda/LDADriver.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/lda/LDADriver.java
Fri Oct 8 19:01:34 2010
@@ -276,7 +276,9 @@ public final class LDADriver extends Abs
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setJarByClass(LDADriver.class);
- job.waitForCompletion(true);
+ if (job.waitForCompletion(true) == false) {
+ throw new InterruptedException("LDA Iteration failed processing " +
stateIn.toString());
+ }
return findLL(stateOut, conf);
}
}
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyDriver.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyDriver.java?rev=1005958&r1=1005957&r2=1005958&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyDriver.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyDriver.java
Fri Oct 8 19:01:34 2010
@@ -95,22 +95,11 @@ public class MeanShiftCanopyDriver exten
double convergenceDelta =
Double.parseDouble(getOption(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION));
int maxIterations =
Integer.parseInt(getOption(DefaultOptionCreator.MAX_ITERATIONS_OPTION));
boolean inputIsCanopies = hasOption(INPUT_IS_CANOPIES_OPTION);
- boolean runSequential =
-
getOption(DefaultOptionCreator.METHOD_OPTION).equalsIgnoreCase(DefaultOptionCreator.SEQUENTIAL_METHOD);
+ boolean runSequential =
getOption(DefaultOptionCreator.METHOD_OPTION).equalsIgnoreCase(DefaultOptionCreator.SEQUENTIAL_METHOD);
ClassLoader ccl = Thread.currentThread().getContextClassLoader();
DistanceMeasure measure =
ccl.loadClass(measureClass).asSubclass(DistanceMeasure.class).newInstance();
- run(getConf(),
- input,
- output,
- measure,
- t1,
- t2,
- convergenceDelta,
- maxIterations,
- inputIsCanopies,
- runClustering,
- runSequential);
+ run(getConf(), input, output, measure, t1, t2, convergenceDelta,
maxIterations, inputIsCanopies, runClustering, runSequential);
return 0;
}
@@ -149,8 +138,8 @@ public class MeanShiftCanopyDriver exten
int maxIterations,
boolean inputIsCanopies,
boolean runClustering,
- boolean runSequential)
- throws IOException, InterruptedException, ClassNotFoundException,
InstantiationException, IllegalAccessException {
+ boolean runSequential) throws IOException,
InterruptedException, ClassNotFoundException, InstantiationException,
+ IllegalAccessException {
Path clustersIn = new Path(output, Cluster.INITIAL_CLUSTERS_DIR);
if (inputIsCanopies) {
clustersIn = input;
@@ -158,8 +147,7 @@ public class MeanShiftCanopyDriver exten
createCanopyFromVectors(conf, input, clustersIn, measure, runSequential);
}
- Path clustersOut =
- buildClusters(conf, clustersIn, output, measure, t1, t2,
convergenceDelta, maxIterations, runSequential);
+ Path clustersOut = buildClusters(conf, clustersIn, output, measure, t1,
t2, convergenceDelta, maxIterations, runSequential);
if (runClustering) {
clusterData(conf,
inputIsCanopies ? input : new Path(output,
Cluster.INITIAL_CLUSTERS_DIR),
@@ -187,8 +175,8 @@ public class MeanShiftCanopyDriver exten
Path input,
Path output,
DistanceMeasure measure,
- boolean runSequential)
- throws IOException, InterruptedException, ClassNotFoundException,
InstantiationException, IllegalAccessException {
+ boolean runSequential) throws
IOException, InterruptedException,
+ ClassNotFoundException, InstantiationException, IllegalAccessException {
if (runSequential) {
createCanopyFromVectorsSeq(input, output, measure);
} else {
@@ -203,8 +191,8 @@ public class MeanShiftCanopyDriver exten
* @param output the Path to the initial clusters directory
* @param measure the DistanceMeasure
*/
- private static void createCanopyFromVectorsSeq(Path input, Path output,
DistanceMeasure measure)
- throws IOException, InstantiationException, IllegalAccessException {
+ private static void createCanopyFromVectorsSeq(Path input, Path output,
DistanceMeasure measure) throws IOException,
+ InstantiationException, IllegalAccessException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(input.toUri(), conf);
FileStatus[] status = fs.listStatus(input, new OutputLogFilter());
@@ -243,7 +231,7 @@ public class MeanShiftCanopyDriver exten
* @throws ClassNotFoundException
*/
private static void createCanopyFromVectorsMR(Configuration conf, Path
input, Path output, DistanceMeasure measure)
- throws IOException, InterruptedException, ClassNotFoundException {
+ throws IOException, InterruptedException, ClassNotFoundException {
conf.set(KMeansConfigKeys.DISTANCE_MEASURE_KEY,
measure.getClass().getName());
Job job = new Job(conf);
job.setJarByClass(MeanShiftCanopyDriver.class);
@@ -257,7 +245,9 @@ public class MeanShiftCanopyDriver exten
FileInputFormat.setInputPaths(job, input);
FileOutputFormat.setOutputPath(job, output);
- job.waitForCompletion(true);
+ if (job.waitForCompletion(true) == false) {
+ throw new InterruptedException("Mean Shift createCanopyFromVectorsMR
failed on input " + input.toString());
+ }
}
/**
@@ -288,8 +278,8 @@ public class MeanShiftCanopyDriver exten
double t2,
double convergenceDelta,
int maxIterations,
- boolean runSequential)
- throws IOException, InterruptedException, ClassNotFoundException,
InstantiationException, IllegalAccessException {
+ boolean runSequential) throws IOException,
InterruptedException, ClassNotFoundException,
+ InstantiationException, IllegalAccessException {
if (runSequential) {
return buildClustersSeq(clustersIn, output, measure, t1, t2,
convergenceDelta, maxIterations);
} else {
@@ -318,8 +308,7 @@ public class MeanShiftCanopyDriver exten
double t1,
double t2,
double convergenceDelta,
- int maxIterations)
- throws IOException, InstantiationException, IllegalAccessException {
+ int maxIterations) throws IOException,
InstantiationException, IllegalAccessException {
MeanShiftCanopyClusterer clusterer = new MeanShiftCanopyClusterer(measure,
t1, t2, convergenceDelta);
List<MeanShiftCanopy> clusters = new ArrayList<MeanShiftCanopy>();
Configuration conf = new Configuration();
@@ -351,13 +340,9 @@ public class MeanShiftCanopyDriver exten
MeanShiftCanopy.class);
try {
for (MeanShiftCanopy cluster : clusters) {
- log.debug("Writing Cluster:{} center:{} numPoints:{} radius:{} to:
{}",
- new Object[] { cluster.getId(),
-
AbstractCluster.formatVector(cluster.getCenter(), null),
- cluster.getNumPoints(),
-
AbstractCluster.formatVector(cluster.getRadius(), null),
- clustersOut.getName()
- });
+ log.debug("Writing Cluster:{} center:{} numPoints:{} radius:{} to:
{}", new Object[] { cluster.getId(),
+ AbstractCluster.formatVector(cluster.getCenter(), null),
cluster.getNumPoints(),
+ AbstractCluster.formatVector(cluster.getRadius(), null),
clustersOut.getName() });
writer.append(new Text(cluster.getIdentifier()), cluster);
}
} finally {
@@ -392,8 +377,7 @@ public class MeanShiftCanopyDriver exten
double t1,
double t2,
double convergenceDelta,
- int maxIterations)
- throws IOException, InterruptedException, ClassNotFoundException {
+ int maxIterations) throws IOException,
InterruptedException, ClassNotFoundException {
// iterate until the clusters converge
boolean converged = false;
int iteration = 1;
@@ -432,36 +416,37 @@ public class MeanShiftCanopyDriver exten
* the double convergence criteria
*/
private static void runIterationMR(Configuration conf,
- Path input,
- Path output,
- Path control,
- String measureClassName,
- double t1,
- double t2,
- double convergenceDelta)
- throws IOException, InterruptedException, ClassNotFoundException {
-
+ Path input,
+ Path output,
+ Path control,
+ String measureClassName,
+ double t1,
+ double t2,
+ double convergenceDelta) throws
IOException, InterruptedException, ClassNotFoundException {
+
conf.set(MeanShiftCanopyConfigKeys.DISTANCE_MEASURE_KEY, measureClassName);
conf.set(MeanShiftCanopyConfigKeys.CLUSTER_CONVERGENCE_KEY,
String.valueOf(convergenceDelta));
conf.set(MeanShiftCanopyConfigKeys.T1_KEY, String.valueOf(t1));
conf.set(MeanShiftCanopyConfigKeys.T2_KEY, String.valueOf(t2));
conf.set(MeanShiftCanopyConfigKeys.CONTROL_PATH_KEY, control.toString());
-
+
Job job = new Job(conf);
-
+
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(MeanShiftCanopy.class);
-
+
FileInputFormat.setInputPaths(job, input);
FileOutputFormat.setOutputPath(job, output);
-
+
job.setMapperClass(MeanShiftCanopyMapper.class);
job.setReducerClass(MeanShiftCanopyReducer.class);
job.setNumReduceTasks(1);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setJarByClass(MeanShiftCanopyDriver.class);
- job.waitForCompletion(true);
+ if (job.waitForCompletion(true) == false) {
+ throw new InterruptedException("Mean Shift Iteration failed on input " +
input.toString());
+ }
}
/**
@@ -477,7 +462,7 @@ public class MeanShiftCanopyDriver exten
* @param runSequential if true run in sequential execution mode
*/
public static void clusterData(Configuration conf, Path input, Path
clustersIn, Path output, boolean runSequential)
- throws IOException, InterruptedException, ClassNotFoundException,
InstantiationException, IllegalAccessException {
+ throws IOException, InterruptedException, ClassNotFoundException,
InstantiationException, IllegalAccessException {
if (runSequential) {
clusterDataSeq(input, clustersIn, output);
} else {
@@ -495,8 +480,8 @@ public class MeanShiftCanopyDriver exten
* @throws InstantiationException
* @throws IllegalAccessException
*/
- private static void clusterDataSeq(Path input, Path clustersIn, Path output)
- throws IOException, InstantiationException, IllegalAccessException {
+ private static void clusterDataSeq(Path input, Path clustersIn, Path output)
throws IOException, InstantiationException,
+ IllegalAccessException {
Collection<MeanShiftCanopy> clusters = new ArrayList<MeanShiftCanopy>();
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(clustersIn.toUri(), conf);
@@ -550,8 +535,8 @@ public class MeanShiftCanopyDriver exten
* @throws InterruptedException
* @throws ClassNotFoundException
*/
- private static void clusterDataMR(Path input, Path clustersIn, Path output)
- throws IOException, InterruptedException, ClassNotFoundException {
+ private static void clusterDataMR(Path input, Path clustersIn, Path output)
throws IOException, InterruptedException,
+ ClassNotFoundException {
Configuration conf = new Configuration();
conf.set(STATE_IN_KEY, clustersIn.toString());
Job job = new Job(conf);
@@ -568,6 +553,8 @@ public class MeanShiftCanopyDriver exten
FileInputFormat.setInputPaths(job, input);
FileOutputFormat.setOutputPath(job, output);
- job.waitForCompletion(true);
+ if (job.waitForCompletion(true) == false) {
+ throw new InterruptedException("Mean Shift Clustering failed on input "
+ clustersIn.toString());
+ }
}
}