http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/evaluation/ClusterEvaluator.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/evaluation/ClusterEvaluator.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/evaluation/ClusterEvaluator.java new file mode 100644 index 0000000..757f38c --- /dev/null +++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/evaluation/ClusterEvaluator.java @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.evaluation; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.mahout.clustering.Cluster; +import org.apache.mahout.clustering.iterator.ClusterWritable; +import org.apache.mahout.common.ClassUtils; +import org.apache.mahout.common.distance.DistanceMeasure; +import org.apache.mahout.common.iterator.sequencefile.PathFilters; +import org.apache.mahout.common.iterator.sequencefile.PathType; +import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterable; +import org.apache.mahout.math.RandomAccessSparseVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.Vector.Element; +import org.apache.mahout.math.VectorWritable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +public class ClusterEvaluator { + + private static final Logger log = LoggerFactory.getLogger(ClusterEvaluator.class); + + private final Map<Integer,List<VectorWritable>> representativePoints; + + private final List<Cluster> clusters; + + private final DistanceMeasure measure; + + /** + * For testing only + * + * @param representativePoints + * a Map<Integer,List<VectorWritable>> of representative points keyed by clusterId + * @param clusters + * a Map<Integer,Cluster> of the clusters keyed by clusterId + * @param measure + * an appropriate DistanceMeasure + */ + public ClusterEvaluator(Map<Integer,List<VectorWritable>> representativePoints, List<Cluster> clusters, + DistanceMeasure measure) { + this.representativePoints = representativePoints; + this.clusters = clusters; + this.measure = measure; + } + + /** + * Initialize a new instance from job information + * + * @param conf + * a Configuration with appropriate parameters + * @param clustersIn + * a String path to the input clusters directory + */ + public ClusterEvaluator(Configuration conf, Path clustersIn) { + measure = ClassUtils + .instantiateAs(conf.get(RepresentativePointsDriver.DISTANCE_MEASURE_KEY), DistanceMeasure.class); + representativePoints = RepresentativePointsMapper.getRepresentativePoints(conf); + clusters = loadClusters(conf, clustersIn); + } + + /** + * Load the clusters from their sequence files + * + * @param clustersIn + * a String pathname to the directory containing input cluster files + * @return a List<Cluster> of the clusters + */ + private static List<Cluster> loadClusters(Configuration conf, Path clustersIn) { + List<Cluster> clusters = new ArrayList<>(); + for (ClusterWritable clusterWritable : new SequenceFileDirValueIterable<ClusterWritable>(clustersIn, PathType.LIST, + PathFilters.logsCRCFilter(), conf)) { + Cluster cluster = clusterWritable.getValue(); + clusters.add(cluster); + } + return clusters; + } + + /** + * Computes the inter-cluster density as defined in "Mahout In Action" + * + * @return the interClusterDensity + */ + public double interClusterDensity() { + double max = Double.NEGATIVE_INFINITY; + double min = Double.POSITIVE_INFINITY; + double sum = 0; + int count = 0; + Map<Integer,Vector> distances = interClusterDistances(); + for (Vector row : distances.values()) { + for (Element element : row.nonZeroes()) { + double d = element.get(); + min = Math.min(d, min); + max = Math.max(d, max); + sum += d; + count++; + } + } + double density = (sum / count - min) / (max - min); + log.info("Scaled Inter-Cluster Density = {}", density); + return density; + } + + /** + * Computes the inter-cluster distances + * + * @return a Map<Integer, Vector> + */ + public Map<Integer,Vector> interClusterDistances() { + Map<Integer,Vector> distances = new TreeMap<>(); + for (int i = 0; i < clusters.size(); i++) { + Cluster clusterI = clusters.get(i); + RandomAccessSparseVector row = new RandomAccessSparseVector(Integer.MAX_VALUE); + distances.put(clusterI.getId(), row); + for (int j = i + 1; j < clusters.size(); j++) { + Cluster clusterJ = clusters.get(j); + double d = measure.distance(clusterI.getCenter(), clusterJ.getCenter()); + row.set(clusterJ.getId(), d); + } + } + return distances; + } + + /** + * Computes the average intra-cluster density as the average of each cluster's intra-cluster density + * + * @return the average intraClusterDensity + */ + public double intraClusterDensity() { + double avgDensity = 0; + int count = 0; + for (Element elem : intraClusterDensities().nonZeroes()) { + double value = elem.get(); + if (!Double.isNaN(value)) { + avgDensity += value; + count++; + } + } + avgDensity = clusters.isEmpty() ? 0 : avgDensity / count; + log.info("Average Intra-Cluster Density = {}", avgDensity); + return avgDensity; + } + + /** + * Computes the intra-cluster densities for all clusters as the average distance of the representative points from + * each other + * + * @return a Vector of the intraClusterDensity of the representativePoints by clusterId + */ + public Vector intraClusterDensities() { + Vector densities = new RandomAccessSparseVector(Integer.MAX_VALUE); + for (Cluster cluster : clusters) { + int count = 0; + double max = Double.NEGATIVE_INFINITY; + double min = Double.POSITIVE_INFINITY; + double sum = 0; + List<VectorWritable> repPoints = representativePoints.get(cluster.getId()); + for (int i = 0; i < repPoints.size(); i++) { + for (int j = i + 1; j < repPoints.size(); j++) { + Vector v1 = repPoints.get(i).get(); + Vector v2 = repPoints.get(j).get(); + double d = measure.distance(v1, v2); + min = Math.min(d, min); + max = Math.max(d, max); + sum += d; + count++; + } + } + double density = (sum / count - min) / (max - min); + densities.set(cluster.getId(), density); + log.info("Intra-Cluster Density[{}] = {}", cluster.getId(), density); + } + return densities; + } +}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsDriver.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsDriver.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsDriver.java new file mode 100644 index 0000000..2fe37ef --- /dev/null +++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsDriver.java @@ -0,0 +1,243 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.evaluation; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.hadoop.util.ToolRunner; +import org.apache.mahout.clustering.AbstractCluster; +import org.apache.mahout.clustering.Cluster; +import org.apache.mahout.clustering.classify.WeightedVectorWritable; +import org.apache.mahout.clustering.iterator.ClusterWritable; +import org.apache.mahout.common.AbstractJob; +import org.apache.mahout.common.ClassUtils; +import org.apache.mahout.common.Pair; +import org.apache.mahout.common.commandline.DefaultOptionCreator; +import org.apache.mahout.common.distance.DistanceMeasure; +import org.apache.mahout.common.iterator.sequencefile.PathFilters; +import org.apache.mahout.common.iterator.sequencefile.PathType; +import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable; +import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterable; +import org.apache.mahout.math.VectorWritable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class RepresentativePointsDriver extends AbstractJob { + + public static final String STATE_IN_KEY = "org.apache.mahout.clustering.stateIn"; + + public static final String DISTANCE_MEASURE_KEY = "org.apache.mahout.clustering.measure"; + + private static final Logger log = LoggerFactory.getLogger(RepresentativePointsDriver.class); + + private RepresentativePointsDriver() {} + + public static void main(String[] args) throws Exception { + ToolRunner.run(new Configuration(), new RepresentativePointsDriver(), args); + } + + @Override + public int run(String[] args) throws ClassNotFoundException, IOException, InterruptedException { + addInputOption(); + addOutputOption(); + addOption("clusteredPoints", "cp", "The path to the clustered points", true); + addOption(DefaultOptionCreator.distanceMeasureOption().create()); + addOption(DefaultOptionCreator.maxIterationsOption().create()); + addOption(DefaultOptionCreator.methodOption().create()); + if (parseArguments(args) == null) { + return -1; + } + + Path input = getInputPath(); + Path output = getOutputPath(); + String distanceMeasureClass = getOption(DefaultOptionCreator.DISTANCE_MEASURE_OPTION); + int maxIterations = Integer.parseInt(getOption(DefaultOptionCreator.MAX_ITERATIONS_OPTION)); + boolean runSequential = getOption(DefaultOptionCreator.METHOD_OPTION).equalsIgnoreCase( + DefaultOptionCreator.SEQUENTIAL_METHOD); + DistanceMeasure measure = ClassUtils.instantiateAs(distanceMeasureClass, DistanceMeasure.class); + Path clusteredPoints = new Path(getOption("clusteredPoints")); + run(getConf(), input, clusteredPoints, output, measure, maxIterations, runSequential); + return 0; + } + + /** + * Utility to print out representative points + * + * @param output + * the Path to the directory containing representativePoints-i folders + * @param numIterations + * the int number of iterations to print + */ + public static void printRepresentativePoints(Path output, int numIterations) { + for (int i = 0; i <= numIterations; i++) { + Path out = new Path(output, "representativePoints-" + i); + System.out.println("Representative Points for iteration " + i); + Configuration conf = new Configuration(); + for (Pair<IntWritable,VectorWritable> record : new SequenceFileDirIterable<IntWritable,VectorWritable>(out, + PathType.LIST, PathFilters.logsCRCFilter(), null, true, conf)) { + System.out.println("\tC-" + record.getFirst().get() + ": " + + AbstractCluster.formatVector(record.getSecond().get(), null)); + } + } + } + + public static void run(Configuration conf, Path clustersIn, Path clusteredPointsIn, Path output, + DistanceMeasure measure, int numIterations, boolean runSequential) throws IOException, InterruptedException, + ClassNotFoundException { + Path stateIn = new Path(output, "representativePoints-0"); + writeInitialState(stateIn, clustersIn); + + for (int iteration = 0; iteration < numIterations; iteration++) { + log.info("Representative Points Iteration {}", iteration); + // point the output to a new directory per iteration + Path stateOut = new Path(output, "representativePoints-" + (iteration + 1)); + runIteration(conf, clusteredPointsIn, stateIn, stateOut, measure, runSequential); + // now point the input to the old output directory + stateIn = stateOut; + } + + conf.set(STATE_IN_KEY, stateIn.toString()); + conf.set(DISTANCE_MEASURE_KEY, measure.getClass().getName()); + } + + private static void writeInitialState(Path output, Path clustersIn) throws IOException { + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.get(output.toUri(), conf); + for (FileStatus dir : fs.globStatus(clustersIn)) { + Path inPath = dir.getPath(); + for (FileStatus part : fs.listStatus(inPath, PathFilters.logsCRCFilter())) { + Path inPart = part.getPath(); + Path path = new Path(output, inPart.getName()); + try (SequenceFile.Writer writer = + new SequenceFile.Writer(fs, conf, path, IntWritable.class, VectorWritable.class)){ + for (ClusterWritable clusterWritable : new SequenceFileValueIterable<ClusterWritable>(inPart, true, conf)) { + Cluster cluster = clusterWritable.getValue(); + if (log.isDebugEnabled()) { + log.debug("C-{}: {}", cluster.getId(), AbstractCluster.formatVector(cluster.getCenter(), null)); + } + writer.append(new IntWritable(cluster.getId()), new VectorWritable(cluster.getCenter())); + } + } + } + } + } + + private static void runIteration(Configuration conf, Path clusteredPointsIn, Path stateIn, Path stateOut, + DistanceMeasure measure, boolean runSequential) throws IOException, InterruptedException, ClassNotFoundException { + if (runSequential) { + runIterationSeq(conf, clusteredPointsIn, stateIn, stateOut, measure); + } else { + runIterationMR(conf, clusteredPointsIn, stateIn, stateOut, measure); + } + } + + /** + * Run the job using supplied arguments as a sequential process + * + * @param conf + * the Configuration to use + * @param clusteredPointsIn + * the directory pathname for input points + * @param stateIn + * the directory pathname for input state + * @param stateOut + * the directory pathname for output state + * @param measure + * the DistanceMeasure to use + */ + private static void runIterationSeq(Configuration conf, Path clusteredPointsIn, Path stateIn, Path stateOut, + DistanceMeasure measure) throws IOException { + + Map<Integer,List<VectorWritable>> repPoints = RepresentativePointsMapper.getRepresentativePoints(conf, stateIn); + Map<Integer,WeightedVectorWritable> mostDistantPoints = new HashMap<>(); + FileSystem fs = FileSystem.get(clusteredPointsIn.toUri(), conf); + for (Pair<IntWritable,WeightedVectorWritable> record + : new SequenceFileDirIterable<IntWritable,WeightedVectorWritable>(clusteredPointsIn, PathType.LIST, + PathFilters.logsCRCFilter(), null, true, conf)) { + RepresentativePointsMapper.mapPoint(record.getFirst(), record.getSecond(), measure, repPoints, mostDistantPoints); + } + int part = 0; + try (SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, new Path(stateOut, "part-m-" + part++), + IntWritable.class, VectorWritable.class)){ + for (Entry<Integer,List<VectorWritable>> entry : repPoints.entrySet()) { + for (VectorWritable vw : entry.getValue()) { + writer.append(new IntWritable(entry.getKey()), vw); + } + } + } + try (SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, new Path(stateOut, "part-m-" + part++), + IntWritable.class, VectorWritable.class)){ + for (Map.Entry<Integer,WeightedVectorWritable> entry : mostDistantPoints.entrySet()) { + writer.append(new IntWritable(entry.getKey()), new VectorWritable(entry.getValue().getVector())); + } + } + } + + /** + * Run the job using supplied arguments as a Map/Reduce process + * + * @param conf + * the Configuration to use + * @param input + * the directory pathname for input points + * @param stateIn + * the directory pathname for input state + * @param stateOut + * the directory pathname for output state + * @param measure + * the DistanceMeasure to use + */ + private static void runIterationMR(Configuration conf, Path input, Path stateIn, Path stateOut, + DistanceMeasure measure) throws IOException, InterruptedException, ClassNotFoundException { + conf.set(STATE_IN_KEY, stateIn.toString()); + conf.set(DISTANCE_MEASURE_KEY, measure.getClass().getName()); + Job job = new Job(conf, "Representative Points Driver running over input: " + input); + job.setJarByClass(RepresentativePointsDriver.class); + job.setOutputKeyClass(IntWritable.class); + job.setOutputValueClass(VectorWritable.class); + job.setMapOutputKeyClass(IntWritable.class); + job.setMapOutputValueClass(WeightedVectorWritable.class); + + FileInputFormat.setInputPaths(job, input); + FileOutputFormat.setOutputPath(job, stateOut); + + job.setMapperClass(RepresentativePointsMapper.class); + job.setReducerClass(RepresentativePointsReducer.class); + job.setInputFormatClass(SequenceFileInputFormat.class); + job.setOutputFormatClass(SequenceFileOutputFormat.class); + + boolean succeeded = job.waitForCompletion(true); + if (!succeeded) { + throw new IllegalStateException("Job failed!"); + } + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsMapper.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsMapper.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsMapper.java new file mode 100644 index 0000000..0ae79ad --- /dev/null +++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsMapper.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.evaluation; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.mahout.clustering.classify.WeightedVectorWritable; +import org.apache.mahout.common.ClassUtils; +import org.apache.mahout.common.Pair; +import org.apache.mahout.common.distance.DistanceMeasure; +import org.apache.mahout.common.distance.EuclideanDistanceMeasure; +import org.apache.mahout.common.iterator.sequencefile.PathFilters; +import org.apache.mahout.common.iterator.sequencefile.PathType; +import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable; +import org.apache.mahout.math.VectorWritable; + +public class RepresentativePointsMapper + extends Mapper<IntWritable, WeightedVectorWritable, IntWritable, WeightedVectorWritable> { + + private Map<Integer, List<VectorWritable>> representativePoints; + private final Map<Integer, WeightedVectorWritable> mostDistantPoints = new HashMap<>(); + private DistanceMeasure measure = new EuclideanDistanceMeasure(); + + @Override + protected void cleanup(Context context) throws IOException, InterruptedException { + for (Map.Entry<Integer, WeightedVectorWritable> entry : mostDistantPoints.entrySet()) { + context.write(new IntWritable(entry.getKey()), entry.getValue()); + } + super.cleanup(context); + } + + @Override + protected void map(IntWritable clusterId, WeightedVectorWritable point, Context context) + throws IOException, InterruptedException { + mapPoint(clusterId, point, measure, representativePoints, mostDistantPoints); + } + + public static void mapPoint(IntWritable clusterId, + WeightedVectorWritable point, + DistanceMeasure measure, + Map<Integer, List<VectorWritable>> representativePoints, + Map<Integer, WeightedVectorWritable> mostDistantPoints) { + int key = clusterId.get(); + WeightedVectorWritable currentMDP = mostDistantPoints.get(key); + + List<VectorWritable> repPoints = representativePoints.get(key); + double totalDistance = 0.0; + if (repPoints != null) { + for (VectorWritable refPoint : repPoints) { + totalDistance += measure.distance(refPoint.get(), point.getVector()); + } + } + if (currentMDP == null || currentMDP.getWeight() < totalDistance) { + mostDistantPoints.put(key, new WeightedVectorWritable(totalDistance, point.getVector().clone())); + } + } + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + super.setup(context); + Configuration conf = context.getConfiguration(); + measure = + ClassUtils.instantiateAs(conf.get(RepresentativePointsDriver.DISTANCE_MEASURE_KEY), DistanceMeasure.class); + representativePoints = getRepresentativePoints(conf); + } + + public void configure(Map<Integer, List<VectorWritable>> referencePoints, DistanceMeasure measure) { + this.representativePoints = referencePoints; + this.measure = measure; + } + + public static Map<Integer, List<VectorWritable>> getRepresentativePoints(Configuration conf) { + String statePath = conf.get(RepresentativePointsDriver.STATE_IN_KEY); + return getRepresentativePoints(conf, new Path(statePath)); + } + + public static Map<Integer, List<VectorWritable>> getRepresentativePoints(Configuration conf, Path statePath) { + Map<Integer, List<VectorWritable>> representativePoints = new HashMap<>(); + for (Pair<IntWritable,VectorWritable> record + : new SequenceFileDirIterable<IntWritable,VectorWritable>(statePath, + PathType.LIST, + PathFilters.logsCRCFilter(), + conf)) { + int keyValue = record.getFirst().get(); + List<VectorWritable> repPoints = representativePoints.get(keyValue); + if (repPoints == null) { + repPoints = new ArrayList<>(); + representativePoints.put(keyValue, repPoints); + } + repPoints.add(record.getSecond()); + } + return representativePoints; + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsReducer.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsReducer.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsReducer.java new file mode 100644 index 0000000..27ca861 --- /dev/null +++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsReducer.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.evaluation; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.mahout.clustering.classify.WeightedVectorWritable; +import org.apache.mahout.math.VectorWritable; + +public class RepresentativePointsReducer + extends Reducer<IntWritable, WeightedVectorWritable, IntWritable, VectorWritable> { + + private Map<Integer, List<VectorWritable>> representativePoints; + + @Override + protected void cleanup(Context context) throws IOException, InterruptedException { + for (Map.Entry<Integer, List<VectorWritable>> entry : representativePoints.entrySet()) { + IntWritable iw = new IntWritable(entry.getKey()); + for (VectorWritable vw : entry.getValue()) { + context.write(iw, vw); + } + } + super.cleanup(context); + } + + @Override + protected void reduce(IntWritable key, Iterable<WeightedVectorWritable> values, Context context) + throws IOException, InterruptedException { + // find the most distant point + WeightedVectorWritable mdp = null; + for (WeightedVectorWritable dpw : values) { + if (mdp == null || mdp.getWeight() < dpw.getWeight()) { + mdp = new WeightedVectorWritable(dpw.getWeight(), dpw.getVector()); + } + } + context.write(new IntWritable(key.get()), new VectorWritable(mdp.getVector())); + } + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + super.setup(context); + Configuration conf = context.getConfiguration(); + representativePoints = RepresentativePointsMapper.getRepresentativePoints(conf); + } + + public void configure(Map<Integer, List<VectorWritable>> representativePoints) { + this.representativePoints = representativePoints; + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/lda/LDAPrintTopics.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/lda/LDAPrintTopics.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/lda/LDAPrintTopics.java new file mode 100644 index 0000000..392909e --- /dev/null +++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/clustering/lda/LDAPrintTopics.java @@ -0,0 +1,229 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.clustering.lda; + +import com.google.common.io.Closeables; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Queue; +import org.apache.commons.cli2.CommandLine; +import org.apache.commons.cli2.Group; +import org.apache.commons.cli2.Option; +import org.apache.commons.cli2.OptionException; +import org.apache.commons.cli2.builder.ArgumentBuilder; +import org.apache.commons.cli2.builder.DefaultOptionBuilder; +import org.apache.commons.cli2.builder.GroupBuilder; +import org.apache.commons.cli2.commandline.Parser; +import org.apache.commons.io.Charsets; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.mahout.common.CommandLineUtil; +import org.apache.mahout.common.IntPairWritable; +import org.apache.mahout.common.Pair; +import org.apache.mahout.common.commandline.DefaultOptionCreator; +import org.apache.mahout.common.iterator.sequencefile.PathType; +import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable; +import org.apache.mahout.utils.vectors.VectorHelper; + +/** + * Class to print out the top K words for each topic. + */ +public final class LDAPrintTopics { + + private LDAPrintTopics() { } + + // Expands the queue list to have a Queue for topic K + private static void ensureQueueSize(Collection<Queue<Pair<String,Double>>> queues, int k) { + for (int i = queues.size(); i <= k; ++i) { + queues.add(new PriorityQueue<Pair<String,Double>>()); + } + } + + public static void main(String[] args) throws Exception { + DefaultOptionBuilder obuilder = new DefaultOptionBuilder(); + ArgumentBuilder abuilder = new ArgumentBuilder(); + GroupBuilder gbuilder = new GroupBuilder(); + + Option inputOpt = DefaultOptionCreator.inputOption().create(); + + Option dictOpt = obuilder.withLongName("dict").withRequired(true).withArgument( + abuilder.withName("dict").withMinimum(1).withMaximum(1).create()).withDescription( + "Dictionary to read in, in the same format as one created by " + + "org.apache.mahout.utils.vectors.lucene.Driver").withShortName("d").create(); + + Option outOpt = DefaultOptionCreator.outputOption().create(); + + Option wordOpt = obuilder.withLongName("words").withRequired(false).withArgument( + abuilder.withName("words").withMinimum(0).withMaximum(1).withDefault("20").create()).withDescription( + "Number of words to print").withShortName("w").create(); + Option dictTypeOpt = obuilder.withLongName("dictionaryType").withRequired(false).withArgument( + abuilder.withName("dictionaryType").withMinimum(1).withMaximum(1).create()).withDescription( + "The dictionary file type (text|sequencefile)").withShortName("dt").create(); + Option helpOpt = obuilder.withLongName("help").withDescription("Print out help").withShortName("h") + .create(); + + Group group = gbuilder.withName("Options").withOption(dictOpt).withOption(outOpt).withOption(wordOpt) + .withOption(inputOpt).withOption(dictTypeOpt).create(); + try { + Parser parser = new Parser(); + parser.setGroup(group); + CommandLine cmdLine = parser.parse(args); + + if (cmdLine.hasOption(helpOpt)) { + CommandLineUtil.printHelp(group); + return; + } + + String input = cmdLine.getValue(inputOpt).toString(); + String dictFile = cmdLine.getValue(dictOpt).toString(); + int numWords = 20; + if (cmdLine.hasOption(wordOpt)) { + numWords = Integer.parseInt(cmdLine.getValue(wordOpt).toString()); + } + Configuration config = new Configuration(); + + String dictionaryType = "text"; + if (cmdLine.hasOption(dictTypeOpt)) { + dictionaryType = cmdLine.getValue(dictTypeOpt).toString(); + } + + List<String> wordList; + if ("text".equals(dictionaryType)) { + wordList = Arrays.asList(VectorHelper.loadTermDictionary(new File(dictFile))); + } else if ("sequencefile".equals(dictionaryType)) { + wordList = Arrays.asList(VectorHelper.loadTermDictionary(config, dictFile)); + } else { + throw new IllegalArgumentException("Invalid dictionary format"); + } + + List<Queue<Pair<String,Double>>> topWords = topWordsForTopics(input, config, wordList, numWords); + + File output = null; + if (cmdLine.hasOption(outOpt)) { + output = new File(cmdLine.getValue(outOpt).toString()); + if (!output.exists() && !output.mkdirs()) { + throw new IOException("Could not create directory: " + output); + } + } + printTopWords(topWords, output); + } catch (OptionException e) { + CommandLineUtil.printHelp(group); + throw e; + } + } + + // Adds the word if the queue is below capacity, or the score is high enough + private static void maybeEnqueue(Queue<Pair<String,Double>> q, String word, double score, int numWordsToPrint) { + if (q.size() >= numWordsToPrint && score > q.peek().getSecond()) { + q.poll(); + } + if (q.size() < numWordsToPrint) { + q.add(new Pair<>(word, score)); + } + } + + private static void printTopWords(List<Queue<Pair<String,Double>>> topWords, File outputDir) + throws IOException { + for (int i = 0; i < topWords.size(); ++i) { + Collection<Pair<String,Double>> topK = topWords.get(i); + Writer out = null; + boolean printingToSystemOut = false; + try { + if (outputDir != null) { + out = new OutputStreamWriter(new FileOutputStream(new File(outputDir, "topic_" + i)), Charsets.UTF_8); + } else { + out = new OutputStreamWriter(System.out, Charsets.UTF_8); + printingToSystemOut = true; + out.write("Topic " + i); + out.write('\n'); + out.write("==========="); + out.write('\n'); + } + List<Pair<String,Double>> topKasList = new ArrayList<>(topK.size()); + for (Pair<String,Double> wordWithScore : topK) { + topKasList.add(wordWithScore); + } + Collections.sort(topKasList, new Comparator<Pair<String,Double>>() { + @Override + public int compare(Pair<String,Double> pair1, Pair<String,Double> pair2) { + return pair2.getSecond().compareTo(pair1.getSecond()); + } + }); + for (Pair<String,Double> wordWithScore : topKasList) { + out.write(wordWithScore.getFirst() + " [p(" + wordWithScore.getFirst() + "|topic_" + i + ") = " + + wordWithScore.getSecond()); + out.write('\n'); + } + } finally { + if (!printingToSystemOut) { + Closeables.close(out, false); + } else { + out.flush(); + } + } + } + } + + private static List<Queue<Pair<String,Double>>> topWordsForTopics(String dir, + Configuration job, + List<String> wordList, + int numWordsToPrint) { + List<Queue<Pair<String,Double>>> queues = new ArrayList<>(); + Map<Integer,Double> expSums = new HashMap<>(); + for (Pair<IntPairWritable,DoubleWritable> record + : new SequenceFileDirIterable<IntPairWritable, DoubleWritable>( + new Path(dir, "part-*"), PathType.GLOB, null, null, true, job)) { + IntPairWritable key = record.getFirst(); + int topic = key.getFirst(); + int word = key.getSecond(); + ensureQueueSize(queues, topic); + if (word >= 0 && topic >= 0) { + double score = record.getSecond().get(); + if (expSums.get(topic) == null) { + expSums.put(topic, 0.0); + } + expSums.put(topic, expSums.get(topic) + Math.exp(score)); + String realWord = wordList.get(word); + maybeEnqueue(queues.get(topic), realWord, score, numWordsToPrint); + } + } + for (int i = 0; i < queues.size(); i++) { + Queue<Pair<String,Double>> queue = queues.get(i); + Queue<Pair<String,Double>> newQueue = new PriorityQueue<>(queue.size()); + double norm = expSums.get(i); + for (Pair<String,Double> pair : queue) { + newQueue.add(new Pair<>(pair.getFirst(), Math.exp(pair.getSecond()) / norm)); + } + queues.set(i, newQueue); + } + return queues; + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/MailArchivesClusteringAnalyzer.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/MailArchivesClusteringAnalyzer.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/MailArchivesClusteringAnalyzer.java new file mode 100644 index 0000000..12ed471 --- /dev/null +++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/MailArchivesClusteringAnalyzer.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.mahout.text; + +import org.apache.lucene.analysis.TokenFilter; +import org.apache.lucene.analysis.TokenStream; +import org.apache.lucene.analysis.Tokenizer; +import org.apache.lucene.analysis.core.LowerCaseFilter; +import org.apache.lucene.analysis.core.StopFilter; +import org.apache.lucene.analysis.en.PorterStemFilter; +import org.apache.lucene.analysis.miscellaneous.ASCIIFoldingFilter; +import org.apache.lucene.analysis.standard.StandardFilter; +import org.apache.lucene.analysis.standard.StandardTokenizer; +import org.apache.lucene.analysis.tokenattributes.CharTermAttribute; +import org.apache.lucene.analysis.util.CharArraySet; +import org.apache.lucene.analysis.util.StopwordAnalyzerBase; + +import java.io.IOException; +import java.util.Arrays; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Custom Lucene Analyzer designed for aggressive feature reduction + * for clustering the ASF Mail Archives using an extended set of + * stop words, excluding non-alpha-numeric tokens, and porter stemming. + */ +public final class MailArchivesClusteringAnalyzer extends StopwordAnalyzerBase { + // extended set of stop words composed of common mail terms like "hi", + // HTML tags, and Java keywords asmany of the messages in the archives + // are subversion check-in notifications + + private static final CharArraySet STOP_SET = new CharArraySet(Arrays.asList( + "3d","7bit","a0","about","above","abstract","across","additional","after", + "afterwards","again","against","align","all","almost","alone","along", + "already","also","although","always","am","among","amongst","amoungst", + "amount","an","and","another","any","anybody","anyhow","anyone","anything", + "anyway","anywhere","are","arial","around","as","ascii","assert","at", + "back","background","base64","bcc","be","became","because","become","becomes", + "becoming","been","before","beforehand","behind","being","below","beside", + "besides","between","beyond","bgcolor","blank","blockquote","body","boolean", + "border","both","br","break","but","by","can","cannot","cant","case","catch", + "cc","cellpadding","cellspacing","center","char","charset","cheers","class", + "co","color","colspan","com","con","const","continue","could","couldnt", + "cry","css","de","dear","default","did","didnt","different","div","do", + "does","doesnt","done","dont","double","down","due","during","each","eg", + "eight","either","else","elsewhere","empty","encoding","enough","enum", + "etc","eu","even","ever","every","everyone","everything","everywhere", + "except","extends","face","family","few","ffffff","final","finally","float", + "font","for","former","formerly","fri","from","further","get","give","go", + "good","got","goto","gt","h1","ha","had","has","hasnt","have","he","head", + "height","hello","helvetica","hence","her","here","hereafter","hereby", + "herein","hereupon","hers","herself","hi","him","himself","his","how", + "however","hr","href","html","http","https","id","ie","if","ill","im", + "image","img","implements","import","in","inc","instanceof","int","interface", + "into","is","isnt","iso-8859-1","it","its","itself","ive","just","keep", + "last","latter","latterly","least","left","less","li","like","long","look", + "lt","ltd","mail","mailto","many","margin","may","me","meanwhile","message", + "meta","might","mill","mine","mon","more","moreover","most","mostly","mshtml", + "mso","much","must","my","myself","name","namely","native","nbsp","need", + "neither","never","nevertheless","new","next","nine","no","nobody","none", + "noone","nor","not","nothing","now","nowhere","null","of","off","often", + "ok","on","once","only","onto","or","org","other","others","otherwise", + "our","ours","ourselves","out","over","own","package","pad","per","perhaps", + "plain","please","pm","printable","private","protected","public","put", + "quot","quote","r1","r2","rather","re","really","regards","reply","return", + "right","said","same","sans","sat","say","saying","see","seem","seemed", + "seeming","seems","serif","serious","several","she","short","should","show", + "side","since","sincere","six","sixty","size","so","solid","some","somehow", + "someone","something","sometime","sometimes","somewhere","span","src", + "static","still","strictfp","string","strong","style","stylesheet","subject", + "such","sun","super","sure","switch","synchronized","table","take","target", + "td","text","th","than","thanks","that","the","their","them","themselves", + "then","thence","there","thereafter","thereby","therefore","therein","thereupon", + "these","they","thick","thin","think","third","this","those","though", + "three","through","throughout","throw","throws","thru","thu","thus","tm", + "to","together","too","top","toward","towards","tr","transfer","transient", + "try","tue","type","ul","un","under","unsubscribe","until","up","upon", + "us","use","used","uses","using","valign","verdana","very","via","void", + "volatile","want","was","we","wed","weight","well","were","what","whatever", + "when","whence","whenever","where","whereafter","whereas","whereby","wherein", + "whereupon","wherever","whether","which","while","whither","who","whoever", + "whole","whom","whose","why","width","will","with","within","without", + "wont","would","wrote","www","yes","yet","you","your","yours","yourself", + "yourselves" + ), false); + + // Regex used to exclude non-alpha-numeric tokens + private static final Pattern ALPHA_NUMERIC = Pattern.compile("^[a-z][a-z0-9_]+$"); + private static final Matcher MATCHER = ALPHA_NUMERIC.matcher(""); + + public MailArchivesClusteringAnalyzer() { + super(STOP_SET); + } + + public MailArchivesClusteringAnalyzer(CharArraySet stopSet) { + super(stopSet); + } + + @Override + protected TokenStreamComponents createComponents(String fieldName) { + Tokenizer tokenizer = new StandardTokenizer(); + TokenStream result = new StandardFilter(tokenizer); + result = new LowerCaseFilter(result); + result = new ASCIIFoldingFilter(result); + result = new AlphaNumericMaxLengthFilter(result); + result = new StopFilter(result, STOP_SET); + result = new PorterStemFilter(result); + return new TokenStreamComponents(tokenizer, result); + } + + /** + * Matches alpha-numeric tokens between 2 and 40 chars long. + */ + static class AlphaNumericMaxLengthFilter extends TokenFilter { + private final CharTermAttribute termAtt; + private final char[] output = new char[28]; + + AlphaNumericMaxLengthFilter(TokenStream in) { + super(in); + termAtt = addAttribute(CharTermAttribute.class); + } + + @Override + public final boolean incrementToken() throws IOException { + // return the first alpha-numeric token between 2 and 40 length + while (input.incrementToken()) { + int length = termAtt.length(); + if (length >= 2 && length <= 28) { + char[] buf = termAtt.buffer(); + int at = 0; + for (int c = 0; c < length; c++) { + char ch = buf[c]; + if (ch != '\'') { + output[at++] = ch; + } + } + String term = new String(output, 0, at); + MATCHER.reset(term); + if (MATCHER.matches() && !term.startsWith("a0")) { + termAtt.setEmpty(); + termAtt.append(term); + return true; + } + } + } + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/MultipleTextFileInputFormat.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/MultipleTextFileInputFormat.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/MultipleTextFileInputFormat.java new file mode 100644 index 0000000..44df006 --- /dev/null +++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/MultipleTextFileInputFormat.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.text; + +import java.io.IOException; + +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader; +import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; + +/** + * + * Used in combining a large number of text files into one text input reader + * along with the WholeFileRecordReader class. + * + */ +public class MultipleTextFileInputFormat extends CombineFileInputFormat<IntWritable, BytesWritable> { + + @Override + public RecordReader<IntWritable, BytesWritable> createRecordReader(InputSplit inputSplit, + TaskAttemptContext taskAttemptContext) + throws IOException { + return new CombineFileRecordReader<>((CombineFileSplit) inputSplit, + taskAttemptContext, WholeFileRecordReader.class); + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/PrefixAdditionFilter.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/PrefixAdditionFilter.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/PrefixAdditionFilter.java new file mode 100644 index 0000000..37ebc44 --- /dev/null +++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/PrefixAdditionFilter.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.text; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.mahout.common.iterator.FileLineIterable; +import org.apache.mahout.utils.io.ChunkedWriter; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.Charset; +import java.util.Map; + +/** + * Default parser for parsing text into sequence files. + */ +public final class PrefixAdditionFilter extends SequenceFilesFromDirectoryFilter { + + public PrefixAdditionFilter(Configuration conf, + String keyPrefix, + Map<String, String> options, + ChunkedWriter writer, + Charset charset, + FileSystem fs) { + super(conf, keyPrefix, options, writer, charset, fs); + } + + @Override + protected void process(FileStatus fst, Path current) throws IOException { + FileSystem fs = getFs(); + ChunkedWriter writer = getWriter(); + if (fst.isDir()) { + String dirPath = getPrefix() + Path.SEPARATOR + current.getName() + Path.SEPARATOR + fst.getPath().getName(); + fs.listStatus(fst.getPath(), + new PrefixAdditionFilter(getConf(), dirPath, getOptions(), writer, getCharset(), fs)); + } else { + try (InputStream in = fs.open(fst.getPath())){ + StringBuilder file = new StringBuilder(); + for (String aFit : new FileLineIterable(in, getCharset(), false)) { + file.append(aFit).append('\n'); + } + String name = current.getName().equals(fst.getPath().getName()) + ? current.getName() + : current.getName() + Path.SEPARATOR + fst.getPath().getName(); + writer.write(getPrefix() + Path.SEPARATOR + name, file.toString()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java new file mode 100644 index 0000000..311ab8d --- /dev/null +++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java @@ -0,0 +1,214 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.text; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.hadoop.util.ToolRunner; +import org.apache.mahout.common.AbstractJob; +import org.apache.mahout.common.ClassUtils; +import org.apache.mahout.common.HadoopUtil; +import org.apache.mahout.common.commandline.DefaultOptionCreator; +import org.apache.mahout.utils.io.ChunkedWriter; + +/** + * Converts a directory of text documents into SequenceFiles of Specified chunkSize. This class takes in a + * parent directory containing sub folders of text documents and recursively reads the files and creates the + * {@link org.apache.hadoop.io.SequenceFile}s of docid => content. The docid is set as the relative path of the + * document from the parent directory prepended with a specified prefix. You can also specify the input encoding + * of the text files. The content of the output SequenceFiles are encoded as UTF-8 text. + */ +public class SequenceFilesFromDirectory extends AbstractJob { + + private static final String PREFIX_ADDITION_FILTER = PrefixAdditionFilter.class.getName(); + + private static final String[] CHUNK_SIZE_OPTION = {"chunkSize", "chunk"}; + public static final String[] FILE_FILTER_CLASS_OPTION = {"fileFilterClass", "filter"}; + private static final String[] CHARSET_OPTION = {"charset", "c"}; + + private static final int MAX_JOB_SPLIT_LOCATIONS = 1000000; + + public static final String[] KEY_PREFIX_OPTION = {"keyPrefix", "prefix"}; + public static final String BASE_INPUT_PATH = "baseinputpath"; + + public static void main(String[] args) throws Exception { + ToolRunner.run(new SequenceFilesFromDirectory(), args); + } + + /* + * callback main after processing MapReduce parameters + */ + @Override + public int run(String[] args) throws Exception { + addOptions(); + addOption(DefaultOptionCreator.methodOption().create()); + addOption(DefaultOptionCreator.overwriteOption().create()); + + if (parseArguments(args) == null) { + return -1; + } + + Map<String, String> options = parseOptions(); + Path output = getOutputPath(); + if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) { + HadoopUtil.delete(getConf(), output); + } + + if (getOption(DefaultOptionCreator.METHOD_OPTION, + DefaultOptionCreator.MAPREDUCE_METHOD).equals(DefaultOptionCreator.SEQUENTIAL_METHOD)) { + runSequential(getConf(), getInputPath(), output, options); + } else { + runMapReduce(getInputPath(), output); + } + + return 0; + } + + private int runSequential(Configuration conf, Path input, Path output, Map<String, String> options) + throws IOException, InterruptedException, NoSuchMethodException { + // Running sequentially + Charset charset = Charset.forName(getOption(CHARSET_OPTION[0])); + String keyPrefix = getOption(KEY_PREFIX_OPTION[0]); + FileSystem fs = FileSystem.get(input.toUri(), conf); + + try (ChunkedWriter writer = new ChunkedWriter(conf, Integer.parseInt(options.get(CHUNK_SIZE_OPTION[0])), output)) { + SequenceFilesFromDirectoryFilter pathFilter; + String fileFilterClassName = options.get(FILE_FILTER_CLASS_OPTION[0]); + if (PrefixAdditionFilter.class.getName().equals(fileFilterClassName)) { + pathFilter = new PrefixAdditionFilter(conf, keyPrefix, options, writer, charset, fs); + } else { + pathFilter = ClassUtils.instantiateAs(fileFilterClassName, SequenceFilesFromDirectoryFilter.class, + new Class[] {Configuration.class, String.class, Map.class, ChunkedWriter.class, Charset.class, FileSystem.class}, + new Object[] {conf, keyPrefix, options, writer, charset, fs}); + } + fs.listStatus(input, pathFilter); + } + return 0; + } + + private int runMapReduce(Path input, Path output) throws IOException, ClassNotFoundException, InterruptedException { + + int chunkSizeInMB = 64; + if (hasOption(CHUNK_SIZE_OPTION[0])) { + chunkSizeInMB = Integer.parseInt(getOption(CHUNK_SIZE_OPTION[0])); + } + + String keyPrefix = null; + if (hasOption(KEY_PREFIX_OPTION[0])) { + keyPrefix = getOption(KEY_PREFIX_OPTION[0]); + } + + String fileFilterClassName = null; + if (hasOption(FILE_FILTER_CLASS_OPTION[0])) { + fileFilterClassName = getOption(FILE_FILTER_CLASS_OPTION[0]); + } + + PathFilter pathFilter = null; + // Prefix Addition is presently handled in the Mapper and unlike runsequential() + // need not be done via a pathFilter + if (!StringUtils.isBlank(fileFilterClassName) && !PrefixAdditionFilter.class.getName().equals(fileFilterClassName)) { + try { + pathFilter = (PathFilter) Class.forName(fileFilterClassName).newInstance(); + } catch (InstantiationException | IllegalAccessException e) { + throw new IllegalStateException(e); + } + } + + // Prepare Job for submission. + Job job = prepareJob(input, output, MultipleTextFileInputFormat.class, + SequenceFilesFromDirectoryMapper.class, Text.class, Text.class, + SequenceFileOutputFormat.class, "SequenceFilesFromDirectory"); + + Configuration jobConfig = job.getConfiguration(); + jobConfig.set(KEY_PREFIX_OPTION[0], keyPrefix); + jobConfig.set(FILE_FILTER_CLASS_OPTION[0], fileFilterClassName); + + FileSystem fs = FileSystem.get(jobConfig); + FileStatus fsFileStatus = fs.getFileStatus(input); + + String inputDirList; + if (pathFilter != null) { + inputDirList = HadoopUtil.buildDirList(fs, fsFileStatus, pathFilter); + } else { + inputDirList = HadoopUtil.buildDirList(fs, fsFileStatus); + } + + jobConfig.set(BASE_INPUT_PATH, input.toString()); + + long chunkSizeInBytes = chunkSizeInMB * 1024 * 1024; + + // set the max split locations, otherwise we get nasty debug stuff + jobConfig.set("mapreduce.job.max.split.locations", String.valueOf(MAX_JOB_SPLIT_LOCATIONS)); + + FileInputFormat.setInputPaths(job, inputDirList); + // need to set this to a multiple of the block size, or no split happens + FileInputFormat.setMaxInputSplitSize(job, chunkSizeInBytes); + FileOutputFormat.setCompressOutput(job, true); + + boolean succeeded = job.waitForCompletion(true); + if (!succeeded) { + return -1; + } + return 0; + } + + /** + * Override this method in order to add additional options to the command line of the SequenceFileFromDirectory job. + * Do not forget to call super() otherwise all standard options (input/output dirs etc) will not be available. + */ + protected void addOptions() { + addInputOption(); + addOutputOption(); + addOption(DefaultOptionCreator.overwriteOption().create()); + addOption(DefaultOptionCreator.methodOption().create()); + addOption(CHUNK_SIZE_OPTION[0], CHUNK_SIZE_OPTION[1], "The chunkSize in MegaBytes. Defaults to 64", "64"); + addOption(FILE_FILTER_CLASS_OPTION[0], FILE_FILTER_CLASS_OPTION[1], + "The name of the class to use for file parsing. Default: " + PREFIX_ADDITION_FILTER, PREFIX_ADDITION_FILTER); + addOption(KEY_PREFIX_OPTION[0], KEY_PREFIX_OPTION[1], "The prefix to be prepended to the key", ""); + addOption(CHARSET_OPTION[0], CHARSET_OPTION[1], + "The name of the character encoding of the input files. Default to UTF-8", "UTF-8"); + } + + /** + * Override this method in order to parse your additional options from the command line. Do not forget to call + * super() otherwise standard options (input/output dirs etc) will not be available. + * + * @return Map of options + */ + protected Map<String, String> parseOptions() { + Map<String, String> options = new HashMap<>(); + options.put(CHUNK_SIZE_OPTION[0], getOption(CHUNK_SIZE_OPTION[0])); + options.put(FILE_FILTER_CLASS_OPTION[0], getOption(FILE_FILTER_CLASS_OPTION[0])); + options.put(CHARSET_OPTION[0], getOption(CHARSET_OPTION[0])); + return options; + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryFilter.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryFilter.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryFilter.java new file mode 100644 index 0000000..6e4bd64 --- /dev/null +++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryFilter.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.text; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.mahout.utils.io.ChunkedWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.Map; + +/** + * Implement this interface if you wish to extend SequenceFilesFromDirectory with your own parsing logic. + */ +public abstract class SequenceFilesFromDirectoryFilter implements PathFilter { + private static final Logger log = LoggerFactory.getLogger(SequenceFilesFromDirectoryFilter.class); + + private final String prefix; + private final ChunkedWriter writer; + private final Charset charset; + private final FileSystem fs; + private final Map<String, String> options; + private final Configuration conf; + + protected SequenceFilesFromDirectoryFilter(Configuration conf, + String keyPrefix, + Map<String, String> options, + ChunkedWriter writer, + Charset charset, + FileSystem fs) { + this.prefix = keyPrefix; + this.writer = writer; + this.charset = charset; + this.fs = fs; + this.options = options; + this.conf = conf; + } + + protected final String getPrefix() { + return prefix; + } + + protected final ChunkedWriter getWriter() { + return writer; + } + + protected final Charset getCharset() { + return charset; + } + + protected final FileSystem getFs() { + return fs; + } + + protected final Map<String, String> getOptions() { + return options; + } + + protected final Configuration getConf() { + return conf; + } + + @Override + public final boolean accept(Path current) { + log.debug("CURRENT: {}", current.getName()); + try { + for (FileStatus fst : fs.listStatus(current)) { + log.debug("CHILD: {}", fst.getPath().getName()); + process(fst, current); + } + } catch (IOException ioe) { + throw new IllegalStateException(ioe); + } + return false; + } + + protected abstract void process(FileStatus in, Path current) throws IOException; +} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryMapper.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryMapper.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryMapper.java new file mode 100644 index 0000000..40df3c2 --- /dev/null +++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryMapper.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.text; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; +import org.apache.mahout.common.HadoopUtil; + +import static org.apache.mahout.text.SequenceFilesFromDirectory.KEY_PREFIX_OPTION; + +/** + * Map class for SequenceFilesFromDirectory MR job + */ +public class SequenceFilesFromDirectoryMapper extends Mapper<IntWritable, BytesWritable, Text, Text> { + + private String keyPrefix; + private Text fileValue = new Text(); + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + super.setup(context); + this.keyPrefix = context.getConfiguration().get(KEY_PREFIX_OPTION[0], ""); + } + + public void map(IntWritable key, BytesWritable value, Context context) + throws IOException, InterruptedException { + + Configuration configuration = context.getConfiguration(); + Path filePath = ((CombineFileSplit) context.getInputSplit()).getPath(key.get()); + String relativeFilePath = HadoopUtil.calcRelativeFilePath(configuration, filePath); + + String filename = this.keyPrefix.length() > 0 ? + this.keyPrefix + Path.SEPARATOR + relativeFilePath : + Path.SEPARATOR + relativeFilePath; + + fileValue.set(value.getBytes(), 0, value.getBytes().length); + context.write(new Text(filename), fileValue); + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java new file mode 100644 index 0000000..c17cc12 --- /dev/null +++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java @@ -0,0 +1,369 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.mahout.text; + +import org.apache.commons.io.DirectoryWalker; +import org.apache.commons.io.comparator.CompositeFileComparator; +import org.apache.commons.io.comparator.DirectoryFileComparator; +import org.apache.commons.io.comparator.PathFileComparator; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.hadoop.util.ToolRunner; +import org.apache.mahout.common.AbstractJob; +import org.apache.mahout.common.HadoopUtil; +import org.apache.mahout.common.commandline.DefaultOptionCreator; +import org.apache.mahout.utils.email.MailOptions; +import org.apache.mahout.utils.email.MailProcessor; +import org.apache.mahout.utils.io.ChunkedWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Comparator; +import java.util.Deque; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; + +/** + * Converts a directory of gzipped mail archives into SequenceFiles of specified + * chunkSize. This class is similar to {@link SequenceFilesFromDirectory} except + * it uses block-compressed {@link org.apache.hadoop.io.SequenceFile}s and parses out the subject and + * body text of each mail message into a separate key/value pair. + */ +public final class SequenceFilesFromMailArchives extends AbstractJob { + + private static final Logger log = LoggerFactory.getLogger(SequenceFilesFromMailArchives.class); + + public static final String[] CHUNK_SIZE_OPTION = {"chunkSize", "chunk"}; + public static final String[] KEY_PREFIX_OPTION = {"keyPrefix", "prefix"}; + public static final String[] CHARSET_OPTION = {"charset", "c"}; + public static final String[] SUBJECT_OPTION = {"subject", "s"}; + public static final String[] TO_OPTION = {"to", "to"}; + public static final String[] FROM_OPTION = {"from", "from"}; + public static final String[] REFERENCES_OPTION = {"references", "refs"}; + public static final String[] BODY_OPTION = {"body", "b"}; + public static final String[] STRIP_QUOTED_OPTION = {"stripQuoted", "q"}; + public static final String[] QUOTED_REGEX_OPTION = {"quotedRegex", "regex"}; + public static final String[] SEPARATOR_OPTION = {"separator", "sep"}; + public static final String[] BODY_SEPARATOR_OPTION = {"bodySeparator", "bodySep"}; + public static final String BASE_INPUT_PATH = "baseinputpath"; + + private static final int MAX_JOB_SPLIT_LOCATIONS = 1000000; + + public void createSequenceFiles(MailOptions options) throws IOException { + try (ChunkedWriter writer = + new ChunkedWriter(getConf(), options.getChunkSize(), new Path(options.getOutputDir()))){ + MailProcessor processor = new MailProcessor(options, options.getPrefix(), writer); + if (options.getInput().isDirectory()) { + PrefixAdditionDirectoryWalker walker = new PrefixAdditionDirectoryWalker(processor, writer); + walker.walk(options.getInput()); + log.info("Parsed {} messages from {}", walker.getMessageCount(), options.getInput().getAbsolutePath()); + } else { + long start = System.currentTimeMillis(); + long cnt = processor.parseMboxLineByLine(options.getInput()); + long finish = System.currentTimeMillis(); + log.info("Parsed {} messages from {} in time: {}", cnt, options.getInput().getAbsolutePath(), finish - start); + } + } + } + + private static class PrefixAdditionDirectoryWalker extends DirectoryWalker<Object> { + + @SuppressWarnings("unchecked") + private static final Comparator<File> FILE_COMPARATOR = new CompositeFileComparator( + DirectoryFileComparator.DIRECTORY_REVERSE, PathFileComparator.PATH_COMPARATOR); + + private final Deque<MailProcessor> processors = new ArrayDeque<>(); + private final ChunkedWriter writer; + private final Deque<Long> messageCounts = new ArrayDeque<>(); + + public PrefixAdditionDirectoryWalker(MailProcessor processor, ChunkedWriter writer) { + processors.addFirst(processor); + this.writer = writer; + messageCounts.addFirst(0L); + } + + public void walk(File startDirectory) throws IOException { + super.walk(startDirectory, null); + } + + public long getMessageCount() { + return messageCounts.getFirst(); + } + + @Override + protected void handleDirectoryStart(File current, int depth, Collection<Object> results) throws IOException { + if (depth > 0) { + log.info("At {}", current.getAbsolutePath()); + MailProcessor processor = processors.getFirst(); + MailProcessor subDirProcessor = new MailProcessor(processor.getOptions(), processor.getPrefix() + + File.separator + current.getName(), writer); + processors.push(subDirProcessor); + messageCounts.push(0L); + } + } + + @Override + protected File[] filterDirectoryContents(File directory, int depth, File[] files) throws IOException { + Arrays.sort(files, FILE_COMPARATOR); + return files; + } + + @Override + protected void handleFile(File current, int depth, Collection<Object> results) throws IOException { + MailProcessor processor = processors.getFirst(); + long currentDirMessageCount = messageCounts.pop(); + try { + currentDirMessageCount += processor.parseMboxLineByLine(current); + } catch (IOException e) { + throw new IllegalStateException("Error processing " + current, e); + } + messageCounts.push(currentDirMessageCount); + } + + @Override + protected void handleDirectoryEnd(File current, int depth, Collection<Object> results) throws IOException { + if (depth > 0) { + final long currentDirMessageCount = messageCounts.pop(); + log.info("Parsed {} messages from directory {}", currentDirMessageCount, current.getAbsolutePath()); + + processors.pop(); + + // aggregate message counts + long parentDirMessageCount = messageCounts.pop(); + parentDirMessageCount += currentDirMessageCount; + messageCounts.push(parentDirMessageCount); + } + } + } + + public static void main(String[] args) throws Exception { + ToolRunner.run(new Configuration(), new SequenceFilesFromMailArchives(), args); + } + + @Override + public int run(String[] args) throws Exception { + addInputOption(); + addOutputOption(); + addOption(DefaultOptionCreator.methodOption().create()); + + addOption(CHUNK_SIZE_OPTION[0], CHUNK_SIZE_OPTION[1], "The chunkSize in MegaBytes. Defaults to 64", "64"); + addOption(KEY_PREFIX_OPTION[0], KEY_PREFIX_OPTION[1], "The prefix to be prepended to the key", ""); + addOption(CHARSET_OPTION[0], CHARSET_OPTION[1], + "The name of the character encoding of the input files. Default to UTF-8", "UTF-8"); + addFlag(SUBJECT_OPTION[0], SUBJECT_OPTION[1], "Include the Mail subject as part of the text. Default is false"); + addFlag(TO_OPTION[0], TO_OPTION[1], "Include the to field in the text. Default is false"); + addFlag(FROM_OPTION[0], FROM_OPTION[1], "Include the from field in the text. Default is false"); + addFlag(REFERENCES_OPTION[0], REFERENCES_OPTION[1], + "Include the references field in the text. Default is false"); + addFlag(BODY_OPTION[0], BODY_OPTION[1], "Include the body in the output. Default is false"); + addFlag(STRIP_QUOTED_OPTION[0], STRIP_QUOTED_OPTION[1], + "Strip (remove) quoted email text in the body. Default is false"); + addOption(QUOTED_REGEX_OPTION[0], QUOTED_REGEX_OPTION[1], + "Specify the regex that identifies quoted text. " + + "Default is to look for > or | at the beginning of the line."); + addOption(SEPARATOR_OPTION[0], SEPARATOR_OPTION[1], + "The separator to use between metadata items (to, from, etc.). Default is \\n", "\n"); + addOption(BODY_SEPARATOR_OPTION[0], BODY_SEPARATOR_OPTION[1], + "The separator to use between lines in the body. Default is \\n. " + + "Useful to change if you wish to have the message be on one line", "\n"); + + addOption(DefaultOptionCreator.helpOption()); + Map<String, List<String>> parsedArgs = parseArguments(args); + if (parsedArgs == null) { + return -1; + } + File input = getInputFile(); + String outputDir = getOutputPath().toString(); + + int chunkSize = 64; + if (hasOption(CHUNK_SIZE_OPTION[0])) { + chunkSize = Integer.parseInt(getOption(CHUNK_SIZE_OPTION[0])); + } + + String prefix = ""; + if (hasOption(KEY_PREFIX_OPTION[0])) { + prefix = getOption(KEY_PREFIX_OPTION[0]); + } + + Charset charset = Charset.forName(getOption(CHARSET_OPTION[0])); + MailOptions options = new MailOptions(); + options.setInput(input); + options.setOutputDir(outputDir); + options.setPrefix(prefix); + options.setChunkSize(chunkSize); + options.setCharset(charset); + + List<Pattern> patterns = new ArrayList<>(5); + // patternOrder is used downstream so that we can know what order the text + // is in instead of encoding it in the string, which + // would require more processing later to remove it pre feature selection. + Map<String, Integer> patternOrder = new HashMap<>(); + int order = 0; + if (hasOption(FROM_OPTION[0])) { + patterns.add(MailProcessor.FROM_PREFIX); + patternOrder.put(MailOptions.FROM, order++); + } + if (hasOption(TO_OPTION[0])) { + patterns.add(MailProcessor.TO_PREFIX); + patternOrder.put(MailOptions.TO, order++); + } + if (hasOption(REFERENCES_OPTION[0])) { + patterns.add(MailProcessor.REFS_PREFIX); + patternOrder.put(MailOptions.REFS, order++); + } + if (hasOption(SUBJECT_OPTION[0])) { + patterns.add(MailProcessor.SUBJECT_PREFIX); + patternOrder.put(MailOptions.SUBJECT, order += 1); + } + options.setStripQuotedText(hasOption(STRIP_QUOTED_OPTION[0])); + + options.setPatternsToMatch(patterns.toArray(new Pattern[patterns.size()])); + options.setPatternOrder(patternOrder); + options.setIncludeBody(hasOption(BODY_OPTION[0])); + + if (hasOption(SEPARATOR_OPTION[0])) { + options.setSeparator(getOption(SEPARATOR_OPTION[0])); + } else { + options.setSeparator("\n"); + } + + if (hasOption(BODY_SEPARATOR_OPTION[0])) { + options.setBodySeparator(getOption(BODY_SEPARATOR_OPTION[0])); + } + + if (hasOption(QUOTED_REGEX_OPTION[0])) { + options.setQuotedTextPattern(Pattern.compile(getOption(QUOTED_REGEX_OPTION[0]))); + } + + if (getOption(DefaultOptionCreator.METHOD_OPTION, + DefaultOptionCreator.MAPREDUCE_METHOD).equals(DefaultOptionCreator.SEQUENTIAL_METHOD)) { + runSequential(options); + } else { + runMapReduce(getInputPath(), getOutputPath()); + } + + return 0; + } + + private int runSequential(MailOptions options) + throws IOException, InterruptedException, NoSuchMethodException { + + long start = System.currentTimeMillis(); + createSequenceFiles(options); + long finish = System.currentTimeMillis(); + log.info("Conversion took {}ms", finish - start); + + return 0; + } + + private int runMapReduce(Path input, Path output) throws IOException, InterruptedException, ClassNotFoundException { + + Job job = prepareJob(input, output, MultipleTextFileInputFormat.class, SequenceFilesFromMailArchivesMapper.class, + Text.class, Text.class, SequenceFileOutputFormat.class, "SequentialFilesFromMailArchives"); + + Configuration jobConfig = job.getConfiguration(); + + if (hasOption(KEY_PREFIX_OPTION[0])) { + jobConfig.set(KEY_PREFIX_OPTION[1], getOption(KEY_PREFIX_OPTION[0])); + } + + int chunkSize = 0; + if (hasOption(CHUNK_SIZE_OPTION[0])) { + chunkSize = Integer.parseInt(getOption(CHUNK_SIZE_OPTION[0])); + jobConfig.set(CHUNK_SIZE_OPTION[0], String.valueOf(chunkSize)); + } + + Charset charset; + if (hasOption(CHARSET_OPTION[0])) { + charset = Charset.forName(getOption(CHARSET_OPTION[0])); + jobConfig.set(CHARSET_OPTION[0], charset.displayName()); + } + + if (hasOption(FROM_OPTION[0])) { + jobConfig.set(FROM_OPTION[1], "true"); + } + + if (hasOption(TO_OPTION[0])) { + jobConfig.set(TO_OPTION[1], "true"); + } + + if (hasOption(REFERENCES_OPTION[0])) { + jobConfig.set(REFERENCES_OPTION[1], "true"); + } + + if (hasOption(SUBJECT_OPTION[0])) { + jobConfig.set(SUBJECT_OPTION[1], "true"); + } + + if (hasOption(QUOTED_REGEX_OPTION[0])) { + jobConfig.set(QUOTED_REGEX_OPTION[1], Pattern.compile(getOption(QUOTED_REGEX_OPTION[0])).toString()); + } + + if (hasOption(SEPARATOR_OPTION[0])) { + jobConfig.set(SEPARATOR_OPTION[1], getOption(SEPARATOR_OPTION[0])); + } else { + jobConfig.set(SEPARATOR_OPTION[1], "\n"); + } + + if (hasOption(BODY_OPTION[0])) { + jobConfig.set(BODY_OPTION[1], "true"); + } else { + jobConfig.set(BODY_OPTION[1], "false"); + } + + if (hasOption(BODY_SEPARATOR_OPTION[0])) { + jobConfig.set(BODY_SEPARATOR_OPTION[1], getOption(BODY_SEPARATOR_OPTION[0])); + } else { + jobConfig.set(BODY_SEPARATOR_OPTION[1], "\n"); + } + + FileSystem fs = FileSystem.get(jobConfig); + FileStatus fsFileStatus = fs.getFileStatus(inputPath); + + jobConfig.set(BASE_INPUT_PATH, inputPath.toString()); + String inputDirList = HadoopUtil.buildDirList(fs, fsFileStatus); + FileInputFormat.setInputPaths(job, inputDirList); + + long chunkSizeInBytes = chunkSize * 1024 * 1024; + // need to set this to a multiple of the block size, or no split happens + FileInputFormat.setMaxInputSplitSize(job, chunkSizeInBytes); + + // set the max split locations, otherwise we get nasty debug stuff + jobConfig.set("mapreduce.job.max.split.locations", String.valueOf(MAX_JOB_SPLIT_LOCATIONS)); + + boolean succeeded = job.waitForCompletion(true); + if (!succeeded) { + return -1; + } + return 0; + } +}
