Author: jeastman
Date: Fri Oct  8 19:01:34 2010
New Revision: 1005958

URL: http://svn.apache.org/viewvc?rev=1005958&view=rev
Log:
MAHOUT-504:
- Added job completion tests to break out of iterations if errors occur
- Fixed canopy cluster mapper initialization problem with _log files on Hadoop
- All synthetic control examples run on Hadoop cluster
- All unit tests run 

Modified:
    
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java
    
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/ClusterMapper.java
    
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletDriver.java
    
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java
    
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java
    
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/lda/LDADriver.java
    
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyDriver.java

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java?rev=1005958&r1=1005957&r2=1005958&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/CanopyDriver.java
 Fri Oct  8 19:01:34 2010
@@ -248,7 +248,9 @@ public class CanopyDriver extends Abstra
     FileInputFormat.addInputPath(job, input);
     Path canopyOutputDir = new Path(output, Cluster.CLUSTERS_DIR + '0');
     FileOutputFormat.setOutputPath(job, canopyOutputDir);
-    job.waitForCompletion(true);
+    if (job.waitForCompletion(true) == false) {
+      throw new InterruptedException("Canopy Job failed processing " + 
input.toString());
+    }
     return canopyOutputDir;
   }
 
@@ -347,7 +349,9 @@ public class CanopyDriver extends Abstra
     FileOutputFormat.setOutputPath(job, outPath);
     HadoopUtil.overwriteOutput(outPath);
 
-    job.waitForCompletion(true);
+    if (job.waitForCompletion(true) == false) {
+      throw new InterruptedException("Canopy Clustering failed processing " + 
canopies.toString());
+    }
   }
 
 }

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/ClusterMapper.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/ClusterMapper.java?rev=1005958&r1=1005957&r2=1005958&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/ClusterMapper.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/canopy/ClusterMapper.java
 Fri Oct  8 19:01:34 2010
@@ -25,7 +25,9 @@ import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
@@ -63,16 +65,23 @@ public class ClusterMapper extends Mappe
 
     canopyClusterer = new CanopyClusterer(context.getConfiguration());
 
-    Configuration configuration = context.getConfiguration();
-    String canopyPath = configuration.get(CanopyConfigKeys.CANOPY_PATH_KEY);
+    Configuration conf = context.getConfiguration();
+    String clustersIn = conf.get(CanopyConfigKeys.CANOPY_PATH_KEY);
 
-    if ((canopyPath != null) && (canopyPath.length() > 0)) {
+    // filter out the files
+    PathFilter clusterFileFilter = new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        return path.getName().startsWith("part");
+      }
+    };
+    if ((clustersIn != null) && (clustersIn.length() > 0)) {
       try {
-        Path path = new Path(canopyPath);
-        FileSystem fs = FileSystem.get(path.toUri(), configuration);
-        FileStatus[] files = fs.listStatus(path);
+        Path clusterPath = new Path(clustersIn,"*");
+        FileSystem fs = clusterPath.getFileSystem(conf);
+        FileStatus[] files = 
fs.listStatus(FileUtil.stat2Paths(fs.globStatus(clusterPath, 
clusterFileFilter)), clusterFileFilter);
         for (FileStatus file : files) {
-          SequenceFile.Reader reader = new SequenceFile.Reader(fs, 
file.getPath(), configuration);
+          SequenceFile.Reader reader = new SequenceFile.Reader(fs, 
file.getPath(), conf);
           try {
             Text key = new Text();
             Canopy value = new Canopy();

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletDriver.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletDriver.java?rev=1005958&r1=1005957&r2=1005958&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletDriver.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/dirichlet/DirichletDriver.java
 Fri Oct  8 19:01:34 2010
@@ -372,7 +372,9 @@ public class DirichletDriver extends Abs
     FileInputFormat.addInputPath(job, input);
     FileOutputFormat.setOutputPath(job, stateOut);
 
-    job.waitForCompletion(true);
+    if (job.waitForCompletion(true) == false) {
+      throw new InterruptedException("Dirichlet Iteration failed processing " 
+ stateIn.toString());
+    }
   }
 
   /**
@@ -561,6 +563,8 @@ public class DirichletDriver extends Abs
     FileInputFormat.addInputPath(job, input);
     FileOutputFormat.setOutputPath(job, output);
 
-    job.waitForCompletion(true);
+    if (job.waitForCompletion(true) == false) {
+      throw new InterruptedException("Dirichlet Clustering failed processing " 
+ stateIn.toString());
+    }
   }
 }

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java?rev=1005958&r1=1005957&r2=1005958&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/fuzzykmeans/FuzzyKMeansDriver.java
 Fri Oct  8 19:01:34 2010
@@ -71,8 +71,7 @@ public class FuzzyKMeansDriver extends A
     addOption(DefaultOptionCreator.distanceMeasureOption().create());
     addOption(DefaultOptionCreator.clustersInOption()
         .withDescription("The input centroids, as Vectors.  Must be a 
SequenceFile of Writable, Cluster/Canopy.  "
-            + "If k is also specified, then a random set of vectors will be 
selected"
-            + " and written out to this path first")
+            + "If k is also specified, then a random set of vectors will be 
selected" + " and written out to this path first")
         .create());
     addOption(DefaultOptionCreator.numClustersOption()
         .withDescription("The k in k-Means.  If specified, then a random 
selection of k Vectors will be chosen"
@@ -114,8 +113,7 @@ public class FuzzyKMeansDriver extends A
           .get(DefaultOptionCreator.NUM_CLUSTERS_OPTION)), measure);
     }
     boolean runClustering = hasOption(DefaultOptionCreator.CLUSTERING_OPTION);
-    boolean runSequential =
-        
getOption(DefaultOptionCreator.METHOD_OPTION).equalsIgnoreCase(DefaultOptionCreator.SEQUENTIAL_METHOD);
+    boolean runSequential = 
getOption(DefaultOptionCreator.METHOD_OPTION).equalsIgnoreCase(DefaultOptionCreator.SEQUENTIAL_METHOD);
     run(getConf(),
         input,
         clusters,
@@ -166,8 +164,8 @@ public class FuzzyKMeansDriver extends A
                          boolean runClustering,
                          boolean emitMostLikely,
                          double threshold,
-                         boolean runSequential)
-    throws IOException, ClassNotFoundException, InterruptedException, 
InstantiationException, IllegalAccessException {
+                         boolean runSequential) throws IOException, 
ClassNotFoundException, InterruptedException,
+      InstantiationException, IllegalAccessException {
     Path clustersOut = buildClusters(new Configuration(),
                                      input,
                                      clustersIn,
@@ -209,6 +207,8 @@ public class FuzzyKMeansDriver extends A
    *          
http://en.wikipedia.org/wiki/Data_clustering#Fuzzy_c-means_clustering
    * 
    * @return true if the iteration successfully runs
+   * @throws ClassNotFoundException 
+   * @throws InterruptedException 
    */
   private static boolean runIteration(Configuration conf,
                                       Path input,
@@ -216,7 +216,7 @@ public class FuzzyKMeansDriver extends A
                                       Path clustersOut,
                                       String measureClass,
                                       double convergenceDelta,
-                                      float m) throws IOException {
+                                      float m) throws IOException, 
InterruptedException, ClassNotFoundException {
 
     conf.set(FuzzyKMeansConfigKeys.CLUSTER_PATH_KEY, clustersIn.toString());
     conf.set(FuzzyKMeansConfigKeys.DISTANCE_MEASURE_KEY, measureClass);
@@ -242,20 +242,11 @@ public class FuzzyKMeansDriver extends A
     FileInputFormat.addInputPath(job, input);
     FileOutputFormat.setOutputPath(job, clustersOut);
 
-    try {
-      job.waitForCompletion(true);
-      FileSystem fs = FileSystem.get(clustersOut.toUri(), conf);
-      return isConverged(clustersOut, conf, fs);
-    } catch (IOException e) {
-      log.warn(e.toString(), e);
-      return true;
-    } catch (InterruptedException e) {
-      log.warn(e.toString(), e);
-      return true;
-    } catch (ClassNotFoundException e) {
-      log.warn(e.toString(), e);
-      return true;
+    if (job.waitForCompletion(true) == false) {
+      throw new InterruptedException("Fuzzy K-Means Iteration failed 
processing " + clustersIn.toString());
     }
+    FileSystem fs = FileSystem.get(clustersOut.toUri(), conf);
+    return isConverged(clustersOut, conf, fs);
   }
 
   /**
@@ -293,10 +284,9 @@ public class FuzzyKMeansDriver extends A
                          boolean runClustering,
                          boolean emitMostLikely,
                          double threshold,
-                         boolean runSequential)
-    throws IOException, ClassNotFoundException, InterruptedException, 
InstantiationException, IllegalAccessException {
-    Path clustersOut =
-        buildClusters(conf, input, clustersIn, output, measure, 
convergenceDelta, maxIterations, m, runSequential);
+                         boolean runSequential) throws IOException, 
ClassNotFoundException, InterruptedException,
+      InstantiationException, IllegalAccessException {
+    Path clustersOut = buildClusters(conf, input, clustersIn, output, measure, 
convergenceDelta, maxIterations, m, runSequential);
     if (runClustering) {
       log.info("Clustering ");
       clusterData(input,
@@ -332,6 +322,8 @@ public class FuzzyKMeansDriver extends A
    * @param runSequential if true run in sequential execution mode
    * 
    * @return the Path of the final clusters directory
+   * @throws ClassNotFoundException 
+   * @throws InterruptedException 
    */
   public static Path buildClusters(Configuration conf,
                                    Path input,
@@ -341,8 +333,7 @@ public class FuzzyKMeansDriver extends A
                                    double convergenceDelta,
                                    int maxIterations,
                                    float m,
-                                   boolean runSequential)
-    throws IOException, InstantiationException, IllegalAccessException {
+                                   boolean runSequential) throws IOException, 
InstantiationException, IllegalAccessException, InterruptedException, 
ClassNotFoundException {
     if (runSequential) {
       return buildClustersSeq(input, clustersIn, output, measure, 
convergenceDelta, maxIterations, m);
     } else {
@@ -414,7 +405,7 @@ public class FuzzyKMeansDriver extends A
                                       DistanceMeasure measure,
                                       double convergenceDelta,
                                       int maxIterations,
-                                      float m) throws IOException {
+                                      float m) throws IOException, 
InterruptedException, ClassNotFoundException {
     boolean converged = false;
     int iteration = 1;
 
@@ -460,8 +451,8 @@ public class FuzzyKMeansDriver extends A
                                  float m,
                                  boolean emitMostLikely,
                                  double threshold,
-                                 boolean runSequential)
-    throws IOException, ClassNotFoundException, InterruptedException, 
InstantiationException, IllegalAccessException {
+                                 boolean runSequential) throws IOException, 
ClassNotFoundException, InterruptedException,
+      InstantiationException, IllegalAccessException {
     if (runSequential) {
       clusterDataSeq(input, clustersIn, output, measure, convergenceDelta, m);
     } else {
@@ -540,7 +531,9 @@ public class FuzzyKMeansDriver extends A
     job.setNumReduceTasks(0);
     job.setJarByClass(FuzzyKMeansDriver.class);
 
-    job.waitForCompletion(true);
+    if (job.waitForCompletion(true) == false) {
+      throw new InterruptedException("Fuzzy K-Means Clustering failed 
processing " + clustersIn.toString());
+    }
   }
 
   /**
@@ -568,8 +561,7 @@ public class FuzzyKMeansDriver extends A
       }
     };
 
-    FileStatus[] matches =
-        fs.listStatus(FileUtil.stat2Paths(fs.globStatus(clusterPath, 
clusterFileFilter)), clusterFileFilter);
+    FileStatus[] matches = 
fs.listStatus(FileUtil.stat2Paths(fs.globStatus(clusterPath, 
clusterFileFilter)), clusterFileFilter);
 
     for (FileStatus match : matches) {
       result.add(fs.makeQualified(match.getPath()));

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java?rev=1005958&r1=1005957&r2=1005958&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/kmeans/KMeansDriver.java
 Fri Oct  8 19:01:34 2010
@@ -339,7 +339,9 @@ public class KMeansDriver extends Abstra
 
     job.setJarByClass(KMeansDriver.class);
     HadoopUtil.overwriteOutput(clustersOut);
-    job.waitForCompletion(true);
+    if (job.waitForCompletion(true) == false) {
+      throw new InterruptedException("K-Means Iteration failed processing " + 
clustersIn.toString());
+    }
     FileSystem fs = FileSystem.get(clustersOut.toUri(), conf);
 
     return isConverged(clustersOut, conf, fs);
@@ -482,6 +484,8 @@ public class KMeansDriver extends Abstra
     job.setNumReduceTasks(0);
     job.setJarByClass(KMeansDriver.class);
 
-    job.waitForCompletion(true);
+    if (job.waitForCompletion(true) == false) {
+      throw new InterruptedException("K-Means Clustering failed processing " + 
clustersIn.toString());
+    }
   }
 }

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/lda/LDADriver.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/lda/LDADriver.java?rev=1005958&r1=1005957&r2=1005958&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/lda/LDADriver.java 
(original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/lda/LDADriver.java 
Fri Oct  8 19:01:34 2010
@@ -276,7 +276,9 @@ public final class LDADriver extends Abs
     job.setInputFormatClass(SequenceFileInputFormat.class);
     job.setJarByClass(LDADriver.class);
 
-    job.waitForCompletion(true);
+    if (job.waitForCompletion(true) == false) {
+      throw new InterruptedException("LDA Iteration failed processing " + 
stateIn.toString());
+    }
     return findLL(stateOut, conf);
   }
 }

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyDriver.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyDriver.java?rev=1005958&r1=1005957&r2=1005958&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyDriver.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/clustering/meanshift/MeanShiftCanopyDriver.java
 Fri Oct  8 19:01:34 2010
@@ -95,22 +95,11 @@ public class MeanShiftCanopyDriver exten
     double convergenceDelta = 
Double.parseDouble(getOption(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION));
     int maxIterations = 
Integer.parseInt(getOption(DefaultOptionCreator.MAX_ITERATIONS_OPTION));
     boolean inputIsCanopies = hasOption(INPUT_IS_CANOPIES_OPTION);
-    boolean runSequential =
-        
getOption(DefaultOptionCreator.METHOD_OPTION).equalsIgnoreCase(DefaultOptionCreator.SEQUENTIAL_METHOD);
+    boolean runSequential = 
getOption(DefaultOptionCreator.METHOD_OPTION).equalsIgnoreCase(DefaultOptionCreator.SEQUENTIAL_METHOD);
     ClassLoader ccl = Thread.currentThread().getContextClassLoader();
     DistanceMeasure measure = 
ccl.loadClass(measureClass).asSubclass(DistanceMeasure.class).newInstance();
 
-    run(getConf(),
-        input,
-        output,
-        measure,
-        t1,
-        t2,
-        convergenceDelta,
-        maxIterations,
-        inputIsCanopies,
-        runClustering,
-        runSequential);
+    run(getConf(), input, output, measure, t1, t2, convergenceDelta, 
maxIterations, inputIsCanopies, runClustering, runSequential);
     return 0;
   }
 
@@ -149,8 +138,8 @@ public class MeanShiftCanopyDriver exten
                   int maxIterations,
                   boolean inputIsCanopies,
                   boolean runClustering,
-                  boolean runSequential)
-    throws IOException, InterruptedException, ClassNotFoundException, 
InstantiationException, IllegalAccessException {
+                  boolean runSequential) throws IOException, 
InterruptedException, ClassNotFoundException, InstantiationException,
+      IllegalAccessException {
     Path clustersIn = new Path(output, Cluster.INITIAL_CLUSTERS_DIR);
     if (inputIsCanopies) {
       clustersIn = input;
@@ -158,8 +147,7 @@ public class MeanShiftCanopyDriver exten
       createCanopyFromVectors(conf, input, clustersIn, measure, runSequential);
     }
 
-    Path clustersOut =
-        buildClusters(conf, clustersIn, output, measure, t1, t2, 
convergenceDelta, maxIterations, runSequential);
+    Path clustersOut = buildClusters(conf, clustersIn, output, measure, t1, 
t2, convergenceDelta, maxIterations, runSequential);
     if (runClustering) {
       clusterData(conf,
                   inputIsCanopies ? input : new Path(output, 
Cluster.INITIAL_CLUSTERS_DIR),
@@ -187,8 +175,8 @@ public class MeanShiftCanopyDriver exten
                                              Path input,
                                              Path output,
                                              DistanceMeasure measure,
-                                             boolean runSequential)
-    throws IOException, InterruptedException, ClassNotFoundException, 
InstantiationException, IllegalAccessException {
+                                             boolean runSequential) throws 
IOException, InterruptedException,
+      ClassNotFoundException, InstantiationException, IllegalAccessException {
     if (runSequential) {
       createCanopyFromVectorsSeq(input, output, measure);
     } else {
@@ -203,8 +191,8 @@ public class MeanShiftCanopyDriver exten
    * @param output the Path to the initial clusters directory
    * @param measure the DistanceMeasure
    */
-  private static void createCanopyFromVectorsSeq(Path input, Path output, 
DistanceMeasure measure)
-    throws IOException, InstantiationException, IllegalAccessException {
+  private static void createCanopyFromVectorsSeq(Path input, Path output, 
DistanceMeasure measure) throws IOException,
+      InstantiationException, IllegalAccessException {
     Configuration conf = new Configuration();
     FileSystem fs = FileSystem.get(input.toUri(), conf);
     FileStatus[] status = fs.listStatus(input, new OutputLogFilter());
@@ -243,7 +231,7 @@ public class MeanShiftCanopyDriver exten
    * @throws ClassNotFoundException
    */
   private static void createCanopyFromVectorsMR(Configuration conf, Path 
input, Path output, DistanceMeasure measure)
-    throws IOException, InterruptedException, ClassNotFoundException {
+      throws IOException, InterruptedException, ClassNotFoundException {
     conf.set(KMeansConfigKeys.DISTANCE_MEASURE_KEY, 
measure.getClass().getName());
     Job job = new Job(conf);
     job.setJarByClass(MeanShiftCanopyDriver.class);
@@ -257,7 +245,9 @@ public class MeanShiftCanopyDriver exten
     FileInputFormat.setInputPaths(job, input);
     FileOutputFormat.setOutputPath(job, output);
 
-    job.waitForCompletion(true);
+    if (job.waitForCompletion(true) == false) {
+      throw new InterruptedException("Mean Shift createCanopyFromVectorsMR 
failed on input " + input.toString());
+    }
   }
 
   /**
@@ -288,8 +278,8 @@ public class MeanShiftCanopyDriver exten
                             double t2,
                             double convergenceDelta,
                             int maxIterations,
-                            boolean runSequential)
-    throws IOException, InterruptedException, ClassNotFoundException, 
InstantiationException, IllegalAccessException {
+                            boolean runSequential) throws IOException, 
InterruptedException, ClassNotFoundException,
+      InstantiationException, IllegalAccessException {
     if (runSequential) {
       return buildClustersSeq(clustersIn, output, measure, t1, t2, 
convergenceDelta, maxIterations);
     } else {
@@ -318,8 +308,7 @@ public class MeanShiftCanopyDriver exten
                                        double t1,
                                        double t2,
                                        double convergenceDelta,
-                                       int maxIterations)
-    throws IOException, InstantiationException, IllegalAccessException {
+                                       int maxIterations) throws IOException, 
InstantiationException, IllegalAccessException {
     MeanShiftCanopyClusterer clusterer = new MeanShiftCanopyClusterer(measure, 
t1, t2, convergenceDelta);
     List<MeanShiftCanopy> clusters = new ArrayList<MeanShiftCanopy>();
     Configuration conf = new Configuration();
@@ -351,13 +340,9 @@ public class MeanShiftCanopyDriver exten
                                                            
MeanShiftCanopy.class);
       try {
         for (MeanShiftCanopy cluster : clusters) {
-          log.debug("Writing Cluster:{} center:{} numPoints:{} radius:{} to: 
{}",
-                   new Object[] { cluster.getId(),
-                                  
AbstractCluster.formatVector(cluster.getCenter(), null),
-                                  cluster.getNumPoints(),
-                                  
AbstractCluster.formatVector(cluster.getRadius(), null),
-                                  clustersOut.getName()
-                   });
+          log.debug("Writing Cluster:{} center:{} numPoints:{} radius:{} to: 
{}", new Object[] { cluster.getId(),
+              AbstractCluster.formatVector(cluster.getCenter(), null), 
cluster.getNumPoints(),
+              AbstractCluster.formatVector(cluster.getRadius(), null), 
clustersOut.getName() });
           writer.append(new Text(cluster.getIdentifier()), cluster);
         }
       } finally {
@@ -392,8 +377,7 @@ public class MeanShiftCanopyDriver exten
                                       double t1,
                                       double t2,
                                       double convergenceDelta,
-                                      int maxIterations)
-    throws IOException, InterruptedException, ClassNotFoundException {
+                                      int maxIterations) throws IOException, 
InterruptedException, ClassNotFoundException {
     // iterate until the clusters converge
     boolean converged = false;
     int iteration = 1;
@@ -432,36 +416,37 @@ public class MeanShiftCanopyDriver exten
    *          the double convergence criteria
    */
   private static void runIterationMR(Configuration conf,
-                                   Path input,
-                                   Path output,
-                                   Path control,
-                                   String measureClassName,
-                                   double t1,
-                                   double t2,
-                                   double convergenceDelta)
-    throws IOException, InterruptedException, ClassNotFoundException {
-  
+                                     Path input,
+                                     Path output,
+                                     Path control,
+                                     String measureClassName,
+                                     double t1,
+                                     double t2,
+                                     double convergenceDelta) throws 
IOException, InterruptedException, ClassNotFoundException {
+
     conf.set(MeanShiftCanopyConfigKeys.DISTANCE_MEASURE_KEY, measureClassName);
     conf.set(MeanShiftCanopyConfigKeys.CLUSTER_CONVERGENCE_KEY, 
String.valueOf(convergenceDelta));
     conf.set(MeanShiftCanopyConfigKeys.T1_KEY, String.valueOf(t1));
     conf.set(MeanShiftCanopyConfigKeys.T2_KEY, String.valueOf(t2));
     conf.set(MeanShiftCanopyConfigKeys.CONTROL_PATH_KEY, control.toString());
-  
+
     Job job = new Job(conf);
-  
+
     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(MeanShiftCanopy.class);
-  
+
     FileInputFormat.setInputPaths(job, input);
     FileOutputFormat.setOutputPath(job, output);
-  
+
     job.setMapperClass(MeanShiftCanopyMapper.class);
     job.setReducerClass(MeanShiftCanopyReducer.class);
     job.setNumReduceTasks(1);
     job.setInputFormatClass(SequenceFileInputFormat.class);
     job.setOutputFormatClass(SequenceFileOutputFormat.class);
     job.setJarByClass(MeanShiftCanopyDriver.class);
-    job.waitForCompletion(true);
+    if (job.waitForCompletion(true) == false) {
+      throw new InterruptedException("Mean Shift Iteration failed on input " + 
input.toString());
+    }
   }
 
   /**
@@ -477,7 +462,7 @@ public class MeanShiftCanopyDriver exten
    * @param runSequential if true run in sequential execution mode
    */
   public static void clusterData(Configuration conf, Path input, Path 
clustersIn, Path output, boolean runSequential)
-    throws IOException, InterruptedException, ClassNotFoundException, 
InstantiationException, IllegalAccessException {
+      throws IOException, InterruptedException, ClassNotFoundException, 
InstantiationException, IllegalAccessException {
     if (runSequential) {
       clusterDataSeq(input, clustersIn, output);
     } else {
@@ -495,8 +480,8 @@ public class MeanShiftCanopyDriver exten
    * @throws InstantiationException
    * @throws IllegalAccessException
    */
-  private static void clusterDataSeq(Path input, Path clustersIn, Path output)
-    throws IOException, InstantiationException, IllegalAccessException {
+  private static void clusterDataSeq(Path input, Path clustersIn, Path output) 
throws IOException, InstantiationException,
+      IllegalAccessException {
     Collection<MeanShiftCanopy> clusters = new ArrayList<MeanShiftCanopy>();
     Configuration conf = new Configuration();
     FileSystem fs = FileSystem.get(clustersIn.toUri(), conf);
@@ -550,8 +535,8 @@ public class MeanShiftCanopyDriver exten
    * @throws InterruptedException
    * @throws ClassNotFoundException
    */
-  private static void clusterDataMR(Path input, Path clustersIn, Path output)
-    throws IOException, InterruptedException, ClassNotFoundException {
+  private static void clusterDataMR(Path input, Path clustersIn, Path output) 
throws IOException, InterruptedException,
+      ClassNotFoundException {
     Configuration conf = new Configuration();
     conf.set(STATE_IN_KEY, clustersIn.toString());
     Job job = new Job(conf);
@@ -568,6 +553,8 @@ public class MeanShiftCanopyDriver exten
     FileInputFormat.setInputPaths(job, input);
     FileOutputFormat.setOutputPath(job, output);
 
-    job.waitForCompletion(true);
+    if (job.waitForCompletion(true) == false) {
+      throw new InterruptedException("Mean Shift Clustering failed on input " 
+ clustersIn.toString());
+    }
   }
 }


Reply via email to