http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsMapper.java ---------------------------------------------------------------------- diff --git a/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsMapper.java b/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsMapper.java deleted file mode 100644 index 0ae79ad..0000000 --- a/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsMapper.java +++ /dev/null @@ -1,117 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.clustering.evaluation; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.mahout.clustering.classify.WeightedVectorWritable; -import org.apache.mahout.common.ClassUtils; -import org.apache.mahout.common.Pair; -import org.apache.mahout.common.distance.DistanceMeasure; -import org.apache.mahout.common.distance.EuclideanDistanceMeasure; -import org.apache.mahout.common.iterator.sequencefile.PathFilters; -import org.apache.mahout.common.iterator.sequencefile.PathType; -import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable; -import org.apache.mahout.math.VectorWritable; - -public class RepresentativePointsMapper - extends Mapper<IntWritable, WeightedVectorWritable, IntWritable, WeightedVectorWritable> { - - private Map<Integer, List<VectorWritable>> representativePoints; - private final Map<Integer, WeightedVectorWritable> mostDistantPoints = new HashMap<>(); - private DistanceMeasure measure = new EuclideanDistanceMeasure(); - - @Override - protected void cleanup(Context context) throws IOException, InterruptedException { - for (Map.Entry<Integer, WeightedVectorWritable> entry : mostDistantPoints.entrySet()) { - context.write(new IntWritable(entry.getKey()), entry.getValue()); - } - super.cleanup(context); - } - - @Override - protected void map(IntWritable clusterId, WeightedVectorWritable point, Context context) - throws IOException, InterruptedException { - mapPoint(clusterId, point, measure, representativePoints, mostDistantPoints); - } - - public static void mapPoint(IntWritable clusterId, - WeightedVectorWritable point, - DistanceMeasure measure, - Map<Integer, List<VectorWritable>> representativePoints, - Map<Integer, WeightedVectorWritable> mostDistantPoints) { - int key = clusterId.get(); - WeightedVectorWritable currentMDP = mostDistantPoints.get(key); - - List<VectorWritable> repPoints = representativePoints.get(key); - double totalDistance = 0.0; - if (repPoints != null) { - for (VectorWritable refPoint : repPoints) { - totalDistance += measure.distance(refPoint.get(), point.getVector()); - } - } - if (currentMDP == null || currentMDP.getWeight() < totalDistance) { - mostDistantPoints.put(key, new WeightedVectorWritable(totalDistance, point.getVector().clone())); - } - } - - @Override - protected void setup(Context context) throws IOException, InterruptedException { - super.setup(context); - Configuration conf = context.getConfiguration(); - measure = - ClassUtils.instantiateAs(conf.get(RepresentativePointsDriver.DISTANCE_MEASURE_KEY), DistanceMeasure.class); - representativePoints = getRepresentativePoints(conf); - } - - public void configure(Map<Integer, List<VectorWritable>> referencePoints, DistanceMeasure measure) { - this.representativePoints = referencePoints; - this.measure = measure; - } - - public static Map<Integer, List<VectorWritable>> getRepresentativePoints(Configuration conf) { - String statePath = conf.get(RepresentativePointsDriver.STATE_IN_KEY); - return getRepresentativePoints(conf, new Path(statePath)); - } - - public static Map<Integer, List<VectorWritable>> getRepresentativePoints(Configuration conf, Path statePath) { - Map<Integer, List<VectorWritable>> representativePoints = new HashMap<>(); - for (Pair<IntWritable,VectorWritable> record - : new SequenceFileDirIterable<IntWritable,VectorWritable>(statePath, - PathType.LIST, - PathFilters.logsCRCFilter(), - conf)) { - int keyValue = record.getFirst().get(); - List<VectorWritable> repPoints = representativePoints.get(keyValue); - if (repPoints == null) { - repPoints = new ArrayList<>(); - representativePoints.put(keyValue, repPoints); - } - repPoints.add(record.getSecond()); - } - return representativePoints; - } -}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsReducer.java ---------------------------------------------------------------------- diff --git a/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsReducer.java b/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsReducer.java deleted file mode 100644 index 27ca861..0000000 --- a/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsReducer.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.clustering.evaluation; - -import java.io.IOException; -import java.util.List; -import java.util.Map; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.mahout.clustering.classify.WeightedVectorWritable; -import org.apache.mahout.math.VectorWritable; - -public class RepresentativePointsReducer - extends Reducer<IntWritable, WeightedVectorWritable, IntWritable, VectorWritable> { - - private Map<Integer, List<VectorWritable>> representativePoints; - - @Override - protected void cleanup(Context context) throws IOException, InterruptedException { - for (Map.Entry<Integer, List<VectorWritable>> entry : representativePoints.entrySet()) { - IntWritable iw = new IntWritable(entry.getKey()); - for (VectorWritable vw : entry.getValue()) { - context.write(iw, vw); - } - } - super.cleanup(context); - } - - @Override - protected void reduce(IntWritable key, Iterable<WeightedVectorWritable> values, Context context) - throws IOException, InterruptedException { - // find the most distant point - WeightedVectorWritable mdp = null; - for (WeightedVectorWritable dpw : values) { - if (mdp == null || mdp.getWeight() < dpw.getWeight()) { - mdp = new WeightedVectorWritable(dpw.getWeight(), dpw.getVector()); - } - } - context.write(new IntWritable(key.get()), new VectorWritable(mdp.getVector())); - } - - @Override - protected void setup(Context context) throws IOException, InterruptedException { - super.setup(context); - Configuration conf = context.getConfiguration(); - representativePoints = RepresentativePointsMapper.getRepresentativePoints(conf); - } - - public void configure(Map<Integer, List<VectorWritable>> representativePoints) { - this.representativePoints = representativePoints; - } - -} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/integration/src/main/java/org/apache/mahout/clustering/lda/LDAPrintTopics.java ---------------------------------------------------------------------- diff --git a/integration/src/main/java/org/apache/mahout/clustering/lda/LDAPrintTopics.java b/integration/src/main/java/org/apache/mahout/clustering/lda/LDAPrintTopics.java deleted file mode 100644 index 392909e..0000000 --- a/integration/src/main/java/org/apache/mahout/clustering/lda/LDAPrintTopics.java +++ /dev/null @@ -1,229 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.clustering.lda; - -import com.google.common.io.Closeables; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStreamWriter; -import java.io.Writer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.PriorityQueue; -import java.util.Queue; -import org.apache.commons.cli2.CommandLine; -import org.apache.commons.cli2.Group; -import org.apache.commons.cli2.Option; -import org.apache.commons.cli2.OptionException; -import org.apache.commons.cli2.builder.ArgumentBuilder; -import org.apache.commons.cli2.builder.DefaultOptionBuilder; -import org.apache.commons.cli2.builder.GroupBuilder; -import org.apache.commons.cli2.commandline.Parser; -import org.apache.commons.io.Charsets; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.DoubleWritable; -import org.apache.mahout.common.CommandLineUtil; -import org.apache.mahout.common.IntPairWritable; -import org.apache.mahout.common.Pair; -import org.apache.mahout.common.commandline.DefaultOptionCreator; -import org.apache.mahout.common.iterator.sequencefile.PathType; -import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable; -import org.apache.mahout.utils.vectors.VectorHelper; - -/** - * Class to print out the top K words for each topic. - */ -public final class LDAPrintTopics { - - private LDAPrintTopics() { } - - // Expands the queue list to have a Queue for topic K - private static void ensureQueueSize(Collection<Queue<Pair<String,Double>>> queues, int k) { - for (int i = queues.size(); i <= k; ++i) { - queues.add(new PriorityQueue<Pair<String,Double>>()); - } - } - - public static void main(String[] args) throws Exception { - DefaultOptionBuilder obuilder = new DefaultOptionBuilder(); - ArgumentBuilder abuilder = new ArgumentBuilder(); - GroupBuilder gbuilder = new GroupBuilder(); - - Option inputOpt = DefaultOptionCreator.inputOption().create(); - - Option dictOpt = obuilder.withLongName("dict").withRequired(true).withArgument( - abuilder.withName("dict").withMinimum(1).withMaximum(1).create()).withDescription( - "Dictionary to read in, in the same format as one created by " - + "org.apache.mahout.utils.vectors.lucene.Driver").withShortName("d").create(); - - Option outOpt = DefaultOptionCreator.outputOption().create(); - - Option wordOpt = obuilder.withLongName("words").withRequired(false).withArgument( - abuilder.withName("words").withMinimum(0).withMaximum(1).withDefault("20").create()).withDescription( - "Number of words to print").withShortName("w").create(); - Option dictTypeOpt = obuilder.withLongName("dictionaryType").withRequired(false).withArgument( - abuilder.withName("dictionaryType").withMinimum(1).withMaximum(1).create()).withDescription( - "The dictionary file type (text|sequencefile)").withShortName("dt").create(); - Option helpOpt = obuilder.withLongName("help").withDescription("Print out help").withShortName("h") - .create(); - - Group group = gbuilder.withName("Options").withOption(dictOpt).withOption(outOpt).withOption(wordOpt) - .withOption(inputOpt).withOption(dictTypeOpt).create(); - try { - Parser parser = new Parser(); - parser.setGroup(group); - CommandLine cmdLine = parser.parse(args); - - if (cmdLine.hasOption(helpOpt)) { - CommandLineUtil.printHelp(group); - return; - } - - String input = cmdLine.getValue(inputOpt).toString(); - String dictFile = cmdLine.getValue(dictOpt).toString(); - int numWords = 20; - if (cmdLine.hasOption(wordOpt)) { - numWords = Integer.parseInt(cmdLine.getValue(wordOpt).toString()); - } - Configuration config = new Configuration(); - - String dictionaryType = "text"; - if (cmdLine.hasOption(dictTypeOpt)) { - dictionaryType = cmdLine.getValue(dictTypeOpt).toString(); - } - - List<String> wordList; - if ("text".equals(dictionaryType)) { - wordList = Arrays.asList(VectorHelper.loadTermDictionary(new File(dictFile))); - } else if ("sequencefile".equals(dictionaryType)) { - wordList = Arrays.asList(VectorHelper.loadTermDictionary(config, dictFile)); - } else { - throw new IllegalArgumentException("Invalid dictionary format"); - } - - List<Queue<Pair<String,Double>>> topWords = topWordsForTopics(input, config, wordList, numWords); - - File output = null; - if (cmdLine.hasOption(outOpt)) { - output = new File(cmdLine.getValue(outOpt).toString()); - if (!output.exists() && !output.mkdirs()) { - throw new IOException("Could not create directory: " + output); - } - } - printTopWords(topWords, output); - } catch (OptionException e) { - CommandLineUtil.printHelp(group); - throw e; - } - } - - // Adds the word if the queue is below capacity, or the score is high enough - private static void maybeEnqueue(Queue<Pair<String,Double>> q, String word, double score, int numWordsToPrint) { - if (q.size() >= numWordsToPrint && score > q.peek().getSecond()) { - q.poll(); - } - if (q.size() < numWordsToPrint) { - q.add(new Pair<>(word, score)); - } - } - - private static void printTopWords(List<Queue<Pair<String,Double>>> topWords, File outputDir) - throws IOException { - for (int i = 0; i < topWords.size(); ++i) { - Collection<Pair<String,Double>> topK = topWords.get(i); - Writer out = null; - boolean printingToSystemOut = false; - try { - if (outputDir != null) { - out = new OutputStreamWriter(new FileOutputStream(new File(outputDir, "topic_" + i)), Charsets.UTF_8); - } else { - out = new OutputStreamWriter(System.out, Charsets.UTF_8); - printingToSystemOut = true; - out.write("Topic " + i); - out.write('\n'); - out.write("==========="); - out.write('\n'); - } - List<Pair<String,Double>> topKasList = new ArrayList<>(topK.size()); - for (Pair<String,Double> wordWithScore : topK) { - topKasList.add(wordWithScore); - } - Collections.sort(topKasList, new Comparator<Pair<String,Double>>() { - @Override - public int compare(Pair<String,Double> pair1, Pair<String,Double> pair2) { - return pair2.getSecond().compareTo(pair1.getSecond()); - } - }); - for (Pair<String,Double> wordWithScore : topKasList) { - out.write(wordWithScore.getFirst() + " [p(" + wordWithScore.getFirst() + "|topic_" + i + ") = " - + wordWithScore.getSecond()); - out.write('\n'); - } - } finally { - if (!printingToSystemOut) { - Closeables.close(out, false); - } else { - out.flush(); - } - } - } - } - - private static List<Queue<Pair<String,Double>>> topWordsForTopics(String dir, - Configuration job, - List<String> wordList, - int numWordsToPrint) { - List<Queue<Pair<String,Double>>> queues = new ArrayList<>(); - Map<Integer,Double> expSums = new HashMap<>(); - for (Pair<IntPairWritable,DoubleWritable> record - : new SequenceFileDirIterable<IntPairWritable, DoubleWritable>( - new Path(dir, "part-*"), PathType.GLOB, null, null, true, job)) { - IntPairWritable key = record.getFirst(); - int topic = key.getFirst(); - int word = key.getSecond(); - ensureQueueSize(queues, topic); - if (word >= 0 && topic >= 0) { - double score = record.getSecond().get(); - if (expSums.get(topic) == null) { - expSums.put(topic, 0.0); - } - expSums.put(topic, expSums.get(topic) + Math.exp(score)); - String realWord = wordList.get(word); - maybeEnqueue(queues.get(topic), realWord, score, numWordsToPrint); - } - } - for (int i = 0; i < queues.size(); i++) { - Queue<Pair<String,Double>> queue = queues.get(i); - Queue<Pair<String,Double>> newQueue = new PriorityQueue<>(queue.size()); - double norm = expSums.get(i); - for (Pair<String,Double> pair : queue) { - newQueue.add(new Pair<>(pair.getFirst(), Math.exp(pair.getSecond()) / norm)); - } - queues.set(i, newQueue); - } - return queues; - } -} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/integration/src/main/java/org/apache/mahout/text/MailArchivesClusteringAnalyzer.java ---------------------------------------------------------------------- diff --git a/integration/src/main/java/org/apache/mahout/text/MailArchivesClusteringAnalyzer.java b/integration/src/main/java/org/apache/mahout/text/MailArchivesClusteringAnalyzer.java deleted file mode 100644 index 12ed471..0000000 --- a/integration/src/main/java/org/apache/mahout/text/MailArchivesClusteringAnalyzer.java +++ /dev/null @@ -1,164 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.mahout.text; - -import org.apache.lucene.analysis.TokenFilter; -import org.apache.lucene.analysis.TokenStream; -import org.apache.lucene.analysis.Tokenizer; -import org.apache.lucene.analysis.core.LowerCaseFilter; -import org.apache.lucene.analysis.core.StopFilter; -import org.apache.lucene.analysis.en.PorterStemFilter; -import org.apache.lucene.analysis.miscellaneous.ASCIIFoldingFilter; -import org.apache.lucene.analysis.standard.StandardFilter; -import org.apache.lucene.analysis.standard.StandardTokenizer; -import org.apache.lucene.analysis.tokenattributes.CharTermAttribute; -import org.apache.lucene.analysis.util.CharArraySet; -import org.apache.lucene.analysis.util.StopwordAnalyzerBase; - -import java.io.IOException; -import java.util.Arrays; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -/** - * Custom Lucene Analyzer designed for aggressive feature reduction - * for clustering the ASF Mail Archives using an extended set of - * stop words, excluding non-alpha-numeric tokens, and porter stemming. - */ -public final class MailArchivesClusteringAnalyzer extends StopwordAnalyzerBase { - // extended set of stop words composed of common mail terms like "hi", - // HTML tags, and Java keywords asmany of the messages in the archives - // are subversion check-in notifications - - private static final CharArraySet STOP_SET = new CharArraySet(Arrays.asList( - "3d","7bit","a0","about","above","abstract","across","additional","after", - "afterwards","again","against","align","all","almost","alone","along", - "already","also","although","always","am","among","amongst","amoungst", - "amount","an","and","another","any","anybody","anyhow","anyone","anything", - "anyway","anywhere","are","arial","around","as","ascii","assert","at", - "back","background","base64","bcc","be","became","because","become","becomes", - "becoming","been","before","beforehand","behind","being","below","beside", - "besides","between","beyond","bgcolor","blank","blockquote","body","boolean", - "border","both","br","break","but","by","can","cannot","cant","case","catch", - "cc","cellpadding","cellspacing","center","char","charset","cheers","class", - "co","color","colspan","com","con","const","continue","could","couldnt", - "cry","css","de","dear","default","did","didnt","different","div","do", - "does","doesnt","done","dont","double","down","due","during","each","eg", - "eight","either","else","elsewhere","empty","encoding","enough","enum", - "etc","eu","even","ever","every","everyone","everything","everywhere", - "except","extends","face","family","few","ffffff","final","finally","float", - "font","for","former","formerly","fri","from","further","get","give","go", - "good","got","goto","gt","h1","ha","had","has","hasnt","have","he","head", - "height","hello","helvetica","hence","her","here","hereafter","hereby", - "herein","hereupon","hers","herself","hi","him","himself","his","how", - "however","hr","href","html","http","https","id","ie","if","ill","im", - "image","img","implements","import","in","inc","instanceof","int","interface", - "into","is","isnt","iso-8859-1","it","its","itself","ive","just","keep", - "last","latter","latterly","least","left","less","li","like","long","look", - "lt","ltd","mail","mailto","many","margin","may","me","meanwhile","message", - "meta","might","mill","mine","mon","more","moreover","most","mostly","mshtml", - "mso","much","must","my","myself","name","namely","native","nbsp","need", - "neither","never","nevertheless","new","next","nine","no","nobody","none", - "noone","nor","not","nothing","now","nowhere","null","of","off","often", - "ok","on","once","only","onto","or","org","other","others","otherwise", - "our","ours","ourselves","out","over","own","package","pad","per","perhaps", - "plain","please","pm","printable","private","protected","public","put", - "quot","quote","r1","r2","rather","re","really","regards","reply","return", - "right","said","same","sans","sat","say","saying","see","seem","seemed", - "seeming","seems","serif","serious","several","she","short","should","show", - "side","since","sincere","six","sixty","size","so","solid","some","somehow", - "someone","something","sometime","sometimes","somewhere","span","src", - "static","still","strictfp","string","strong","style","stylesheet","subject", - "such","sun","super","sure","switch","synchronized","table","take","target", - "td","text","th","than","thanks","that","the","their","them","themselves", - "then","thence","there","thereafter","thereby","therefore","therein","thereupon", - "these","they","thick","thin","think","third","this","those","though", - "three","through","throughout","throw","throws","thru","thu","thus","tm", - "to","together","too","top","toward","towards","tr","transfer","transient", - "try","tue","type","ul","un","under","unsubscribe","until","up","upon", - "us","use","used","uses","using","valign","verdana","very","via","void", - "volatile","want","was","we","wed","weight","well","were","what","whatever", - "when","whence","whenever","where","whereafter","whereas","whereby","wherein", - "whereupon","wherever","whether","which","while","whither","who","whoever", - "whole","whom","whose","why","width","will","with","within","without", - "wont","would","wrote","www","yes","yet","you","your","yours","yourself", - "yourselves" - ), false); - - // Regex used to exclude non-alpha-numeric tokens - private static final Pattern ALPHA_NUMERIC = Pattern.compile("^[a-z][a-z0-9_]+$"); - private static final Matcher MATCHER = ALPHA_NUMERIC.matcher(""); - - public MailArchivesClusteringAnalyzer() { - super(STOP_SET); - } - - public MailArchivesClusteringAnalyzer(CharArraySet stopSet) { - super(stopSet); - } - - @Override - protected TokenStreamComponents createComponents(String fieldName) { - Tokenizer tokenizer = new StandardTokenizer(); - TokenStream result = new StandardFilter(tokenizer); - result = new LowerCaseFilter(result); - result = new ASCIIFoldingFilter(result); - result = new AlphaNumericMaxLengthFilter(result); - result = new StopFilter(result, STOP_SET); - result = new PorterStemFilter(result); - return new TokenStreamComponents(tokenizer, result); - } - - /** - * Matches alpha-numeric tokens between 2 and 40 chars long. - */ - static class AlphaNumericMaxLengthFilter extends TokenFilter { - private final CharTermAttribute termAtt; - private final char[] output = new char[28]; - - AlphaNumericMaxLengthFilter(TokenStream in) { - super(in); - termAtt = addAttribute(CharTermAttribute.class); - } - - @Override - public final boolean incrementToken() throws IOException { - // return the first alpha-numeric token between 2 and 40 length - while (input.incrementToken()) { - int length = termAtt.length(); - if (length >= 2 && length <= 28) { - char[] buf = termAtt.buffer(); - int at = 0; - for (int c = 0; c < length; c++) { - char ch = buf[c]; - if (ch != '\'') { - output[at++] = ch; - } - } - String term = new String(output, 0, at); - MATCHER.reset(term); - if (MATCHER.matches() && !term.startsWith("a0")) { - termAtt.setEmpty(); - termAtt.append(term); - return true; - } - } - } - return false; - } - } -} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/integration/src/main/java/org/apache/mahout/text/MultipleTextFileInputFormat.java ---------------------------------------------------------------------- diff --git a/integration/src/main/java/org/apache/mahout/text/MultipleTextFileInputFormat.java b/integration/src/main/java/org/apache/mahout/text/MultipleTextFileInputFormat.java deleted file mode 100644 index 44df006..0000000 --- a/integration/src/main/java/org/apache/mahout/text/MultipleTextFileInputFormat.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.text; - -import java.io.IOException; - -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat; -import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader; -import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; - -/** - * - * Used in combining a large number of text files into one text input reader - * along with the WholeFileRecordReader class. - * - */ -public class MultipleTextFileInputFormat extends CombineFileInputFormat<IntWritable, BytesWritable> { - - @Override - public RecordReader<IntWritable, BytesWritable> createRecordReader(InputSplit inputSplit, - TaskAttemptContext taskAttemptContext) - throws IOException { - return new CombineFileRecordReader<>((CombineFileSplit) inputSplit, - taskAttemptContext, WholeFileRecordReader.class); - } -} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/integration/src/main/java/org/apache/mahout/text/PrefixAdditionFilter.java ---------------------------------------------------------------------- diff --git a/integration/src/main/java/org/apache/mahout/text/PrefixAdditionFilter.java b/integration/src/main/java/org/apache/mahout/text/PrefixAdditionFilter.java deleted file mode 100644 index 37ebc44..0000000 --- a/integration/src/main/java/org/apache/mahout/text/PrefixAdditionFilter.java +++ /dev/null @@ -1,67 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.text; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.mahout.common.iterator.FileLineIterable; -import org.apache.mahout.utils.io.ChunkedWriter; - -import java.io.IOException; -import java.io.InputStream; -import java.nio.charset.Charset; -import java.util.Map; - -/** - * Default parser for parsing text into sequence files. - */ -public final class PrefixAdditionFilter extends SequenceFilesFromDirectoryFilter { - - public PrefixAdditionFilter(Configuration conf, - String keyPrefix, - Map<String, String> options, - ChunkedWriter writer, - Charset charset, - FileSystem fs) { - super(conf, keyPrefix, options, writer, charset, fs); - } - - @Override - protected void process(FileStatus fst, Path current) throws IOException { - FileSystem fs = getFs(); - ChunkedWriter writer = getWriter(); - if (fst.isDir()) { - String dirPath = getPrefix() + Path.SEPARATOR + current.getName() + Path.SEPARATOR + fst.getPath().getName(); - fs.listStatus(fst.getPath(), - new PrefixAdditionFilter(getConf(), dirPath, getOptions(), writer, getCharset(), fs)); - } else { - try (InputStream in = fs.open(fst.getPath())){ - StringBuilder file = new StringBuilder(); - for (String aFit : new FileLineIterable(in, getCharset(), false)) { - file.append(aFit).append('\n'); - } - String name = current.getName().equals(fst.getPath().getName()) - ? current.getName() - : current.getName() + Path.SEPARATOR + fst.getPath().getName(); - writer.write(getPrefix() + Path.SEPARATOR + name, file.toString()); - } - } - } -} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java ---------------------------------------------------------------------- diff --git a/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java b/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java deleted file mode 100644 index 311ab8d..0000000 --- a/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java +++ /dev/null @@ -1,214 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.text; - -import java.io.IOException; -import java.nio.charset.Charset; -import java.util.HashMap; -import java.util.Map; - -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; -import org.apache.hadoop.util.ToolRunner; -import org.apache.mahout.common.AbstractJob; -import org.apache.mahout.common.ClassUtils; -import org.apache.mahout.common.HadoopUtil; -import org.apache.mahout.common.commandline.DefaultOptionCreator; -import org.apache.mahout.utils.io.ChunkedWriter; - -/** - * Converts a directory of text documents into SequenceFiles of Specified chunkSize. This class takes in a - * parent directory containing sub folders of text documents and recursively reads the files and creates the - * {@link org.apache.hadoop.io.SequenceFile}s of docid => content. The docid is set as the relative path of the - * document from the parent directory prepended with a specified prefix. You can also specify the input encoding - * of the text files. The content of the output SequenceFiles are encoded as UTF-8 text. - */ -public class SequenceFilesFromDirectory extends AbstractJob { - - private static final String PREFIX_ADDITION_FILTER = PrefixAdditionFilter.class.getName(); - - private static final String[] CHUNK_SIZE_OPTION = {"chunkSize", "chunk"}; - public static final String[] FILE_FILTER_CLASS_OPTION = {"fileFilterClass", "filter"}; - private static final String[] CHARSET_OPTION = {"charset", "c"}; - - private static final int MAX_JOB_SPLIT_LOCATIONS = 1000000; - - public static final String[] KEY_PREFIX_OPTION = {"keyPrefix", "prefix"}; - public static final String BASE_INPUT_PATH = "baseinputpath"; - - public static void main(String[] args) throws Exception { - ToolRunner.run(new SequenceFilesFromDirectory(), args); - } - - /* - * callback main after processing MapReduce parameters - */ - @Override - public int run(String[] args) throws Exception { - addOptions(); - addOption(DefaultOptionCreator.methodOption().create()); - addOption(DefaultOptionCreator.overwriteOption().create()); - - if (parseArguments(args) == null) { - return -1; - } - - Map<String, String> options = parseOptions(); - Path output = getOutputPath(); - if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) { - HadoopUtil.delete(getConf(), output); - } - - if (getOption(DefaultOptionCreator.METHOD_OPTION, - DefaultOptionCreator.MAPREDUCE_METHOD).equals(DefaultOptionCreator.SEQUENTIAL_METHOD)) { - runSequential(getConf(), getInputPath(), output, options); - } else { - runMapReduce(getInputPath(), output); - } - - return 0; - } - - private int runSequential(Configuration conf, Path input, Path output, Map<String, String> options) - throws IOException, InterruptedException, NoSuchMethodException { - // Running sequentially - Charset charset = Charset.forName(getOption(CHARSET_OPTION[0])); - String keyPrefix = getOption(KEY_PREFIX_OPTION[0]); - FileSystem fs = FileSystem.get(input.toUri(), conf); - - try (ChunkedWriter writer = new ChunkedWriter(conf, Integer.parseInt(options.get(CHUNK_SIZE_OPTION[0])), output)) { - SequenceFilesFromDirectoryFilter pathFilter; - String fileFilterClassName = options.get(FILE_FILTER_CLASS_OPTION[0]); - if (PrefixAdditionFilter.class.getName().equals(fileFilterClassName)) { - pathFilter = new PrefixAdditionFilter(conf, keyPrefix, options, writer, charset, fs); - } else { - pathFilter = ClassUtils.instantiateAs(fileFilterClassName, SequenceFilesFromDirectoryFilter.class, - new Class[] {Configuration.class, String.class, Map.class, ChunkedWriter.class, Charset.class, FileSystem.class}, - new Object[] {conf, keyPrefix, options, writer, charset, fs}); - } - fs.listStatus(input, pathFilter); - } - return 0; - } - - private int runMapReduce(Path input, Path output) throws IOException, ClassNotFoundException, InterruptedException { - - int chunkSizeInMB = 64; - if (hasOption(CHUNK_SIZE_OPTION[0])) { - chunkSizeInMB = Integer.parseInt(getOption(CHUNK_SIZE_OPTION[0])); - } - - String keyPrefix = null; - if (hasOption(KEY_PREFIX_OPTION[0])) { - keyPrefix = getOption(KEY_PREFIX_OPTION[0]); - } - - String fileFilterClassName = null; - if (hasOption(FILE_FILTER_CLASS_OPTION[0])) { - fileFilterClassName = getOption(FILE_FILTER_CLASS_OPTION[0]); - } - - PathFilter pathFilter = null; - // Prefix Addition is presently handled in the Mapper and unlike runsequential() - // need not be done via a pathFilter - if (!StringUtils.isBlank(fileFilterClassName) && !PrefixAdditionFilter.class.getName().equals(fileFilterClassName)) { - try { - pathFilter = (PathFilter) Class.forName(fileFilterClassName).newInstance(); - } catch (InstantiationException | IllegalAccessException e) { - throw new IllegalStateException(e); - } - } - - // Prepare Job for submission. - Job job = prepareJob(input, output, MultipleTextFileInputFormat.class, - SequenceFilesFromDirectoryMapper.class, Text.class, Text.class, - SequenceFileOutputFormat.class, "SequenceFilesFromDirectory"); - - Configuration jobConfig = job.getConfiguration(); - jobConfig.set(KEY_PREFIX_OPTION[0], keyPrefix); - jobConfig.set(FILE_FILTER_CLASS_OPTION[0], fileFilterClassName); - - FileSystem fs = FileSystem.get(jobConfig); - FileStatus fsFileStatus = fs.getFileStatus(input); - - String inputDirList; - if (pathFilter != null) { - inputDirList = HadoopUtil.buildDirList(fs, fsFileStatus, pathFilter); - } else { - inputDirList = HadoopUtil.buildDirList(fs, fsFileStatus); - } - - jobConfig.set(BASE_INPUT_PATH, input.toString()); - - long chunkSizeInBytes = chunkSizeInMB * 1024 * 1024; - - // set the max split locations, otherwise we get nasty debug stuff - jobConfig.set("mapreduce.job.max.split.locations", String.valueOf(MAX_JOB_SPLIT_LOCATIONS)); - - FileInputFormat.setInputPaths(job, inputDirList); - // need to set this to a multiple of the block size, or no split happens - FileInputFormat.setMaxInputSplitSize(job, chunkSizeInBytes); - FileOutputFormat.setCompressOutput(job, true); - - boolean succeeded = job.waitForCompletion(true); - if (!succeeded) { - return -1; - } - return 0; - } - - /** - * Override this method in order to add additional options to the command line of the SequenceFileFromDirectory job. - * Do not forget to call super() otherwise all standard options (input/output dirs etc) will not be available. - */ - protected void addOptions() { - addInputOption(); - addOutputOption(); - addOption(DefaultOptionCreator.overwriteOption().create()); - addOption(DefaultOptionCreator.methodOption().create()); - addOption(CHUNK_SIZE_OPTION[0], CHUNK_SIZE_OPTION[1], "The chunkSize in MegaBytes. Defaults to 64", "64"); - addOption(FILE_FILTER_CLASS_OPTION[0], FILE_FILTER_CLASS_OPTION[1], - "The name of the class to use for file parsing. Default: " + PREFIX_ADDITION_FILTER, PREFIX_ADDITION_FILTER); - addOption(KEY_PREFIX_OPTION[0], KEY_PREFIX_OPTION[1], "The prefix to be prepended to the key", ""); - addOption(CHARSET_OPTION[0], CHARSET_OPTION[1], - "The name of the character encoding of the input files. Default to UTF-8", "UTF-8"); - } - - /** - * Override this method in order to parse your additional options from the command line. Do not forget to call - * super() otherwise standard options (input/output dirs etc) will not be available. - * - * @return Map of options - */ - protected Map<String, String> parseOptions() { - Map<String, String> options = new HashMap<>(); - options.put(CHUNK_SIZE_OPTION[0], getOption(CHUNK_SIZE_OPTION[0])); - options.put(FILE_FILTER_CLASS_OPTION[0], getOption(FILE_FILTER_CLASS_OPTION[0])); - options.put(CHARSET_OPTION[0], getOption(CHARSET_OPTION[0])); - return options; - } -} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryFilter.java ---------------------------------------------------------------------- diff --git a/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryFilter.java b/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryFilter.java deleted file mode 100644 index 6e4bd64..0000000 --- a/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryFilter.java +++ /dev/null @@ -1,99 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.text; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; -import org.apache.mahout.utils.io.ChunkedWriter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.nio.charset.Charset; -import java.util.Map; - -/** - * Implement this interface if you wish to extend SequenceFilesFromDirectory with your own parsing logic. - */ -public abstract class SequenceFilesFromDirectoryFilter implements PathFilter { - private static final Logger log = LoggerFactory.getLogger(SequenceFilesFromDirectoryFilter.class); - - private final String prefix; - private final ChunkedWriter writer; - private final Charset charset; - private final FileSystem fs; - private final Map<String, String> options; - private final Configuration conf; - - protected SequenceFilesFromDirectoryFilter(Configuration conf, - String keyPrefix, - Map<String, String> options, - ChunkedWriter writer, - Charset charset, - FileSystem fs) { - this.prefix = keyPrefix; - this.writer = writer; - this.charset = charset; - this.fs = fs; - this.options = options; - this.conf = conf; - } - - protected final String getPrefix() { - return prefix; - } - - protected final ChunkedWriter getWriter() { - return writer; - } - - protected final Charset getCharset() { - return charset; - } - - protected final FileSystem getFs() { - return fs; - } - - protected final Map<String, String> getOptions() { - return options; - } - - protected final Configuration getConf() { - return conf; - } - - @Override - public final boolean accept(Path current) { - log.debug("CURRENT: {}", current.getName()); - try { - for (FileStatus fst : fs.listStatus(current)) { - log.debug("CHILD: {}", fst.getPath().getName()); - process(fst, current); - } - } catch (IOException ioe) { - throw new IllegalStateException(ioe); - } - return false; - } - - protected abstract void process(FileStatus in, Path current) throws IOException; -} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryMapper.java ---------------------------------------------------------------------- diff --git a/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryMapper.java b/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryMapper.java deleted file mode 100644 index 40df3c2..0000000 --- a/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryMapper.java +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.text; - -import java.io.IOException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; -import org.apache.mahout.common.HadoopUtil; - -import static org.apache.mahout.text.SequenceFilesFromDirectory.KEY_PREFIX_OPTION; - -/** - * Map class for SequenceFilesFromDirectory MR job - */ -public class SequenceFilesFromDirectoryMapper extends Mapper<IntWritable, BytesWritable, Text, Text> { - - private String keyPrefix; - private Text fileValue = new Text(); - - @Override - protected void setup(Context context) throws IOException, InterruptedException { - super.setup(context); - this.keyPrefix = context.getConfiguration().get(KEY_PREFIX_OPTION[0], ""); - } - - public void map(IntWritable key, BytesWritable value, Context context) - throws IOException, InterruptedException { - - Configuration configuration = context.getConfiguration(); - Path filePath = ((CombineFileSplit) context.getInputSplit()).getPath(key.get()); - String relativeFilePath = HadoopUtil.calcRelativeFilePath(configuration, filePath); - - String filename = this.keyPrefix.length() > 0 ? - this.keyPrefix + Path.SEPARATOR + relativeFilePath : - Path.SEPARATOR + relativeFilePath; - - fileValue.set(value.getBytes(), 0, value.getBytes().length); - context.write(new Text(filename), fileValue); - } -} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java ---------------------------------------------------------------------- diff --git a/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java b/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java deleted file mode 100644 index c17cc12..0000000 --- a/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java +++ /dev/null @@ -1,369 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.mahout.text; - -import org.apache.commons.io.DirectoryWalker; -import org.apache.commons.io.comparator.CompositeFileComparator; -import org.apache.commons.io.comparator.DirectoryFileComparator; -import org.apache.commons.io.comparator.PathFileComparator; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; -import org.apache.hadoop.util.ToolRunner; -import org.apache.mahout.common.AbstractJob; -import org.apache.mahout.common.HadoopUtil; -import org.apache.mahout.common.commandline.DefaultOptionCreator; -import org.apache.mahout.utils.email.MailOptions; -import org.apache.mahout.utils.email.MailProcessor; -import org.apache.mahout.utils.io.ChunkedWriter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.nio.charset.Charset; -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Comparator; -import java.util.Deque; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.regex.Pattern; - -/** - * Converts a directory of gzipped mail archives into SequenceFiles of specified - * chunkSize. This class is similar to {@link SequenceFilesFromDirectory} except - * it uses block-compressed {@link org.apache.hadoop.io.SequenceFile}s and parses out the subject and - * body text of each mail message into a separate key/value pair. - */ -public final class SequenceFilesFromMailArchives extends AbstractJob { - - private static final Logger log = LoggerFactory.getLogger(SequenceFilesFromMailArchives.class); - - public static final String[] CHUNK_SIZE_OPTION = {"chunkSize", "chunk"}; - public static final String[] KEY_PREFIX_OPTION = {"keyPrefix", "prefix"}; - public static final String[] CHARSET_OPTION = {"charset", "c"}; - public static final String[] SUBJECT_OPTION = {"subject", "s"}; - public static final String[] TO_OPTION = {"to", "to"}; - public static final String[] FROM_OPTION = {"from", "from"}; - public static final String[] REFERENCES_OPTION = {"references", "refs"}; - public static final String[] BODY_OPTION = {"body", "b"}; - public static final String[] STRIP_QUOTED_OPTION = {"stripQuoted", "q"}; - public static final String[] QUOTED_REGEX_OPTION = {"quotedRegex", "regex"}; - public static final String[] SEPARATOR_OPTION = {"separator", "sep"}; - public static final String[] BODY_SEPARATOR_OPTION = {"bodySeparator", "bodySep"}; - public static final String BASE_INPUT_PATH = "baseinputpath"; - - private static final int MAX_JOB_SPLIT_LOCATIONS = 1000000; - - public void createSequenceFiles(MailOptions options) throws IOException { - try (ChunkedWriter writer = - new ChunkedWriter(getConf(), options.getChunkSize(), new Path(options.getOutputDir()))){ - MailProcessor processor = new MailProcessor(options, options.getPrefix(), writer); - if (options.getInput().isDirectory()) { - PrefixAdditionDirectoryWalker walker = new PrefixAdditionDirectoryWalker(processor, writer); - walker.walk(options.getInput()); - log.info("Parsed {} messages from {}", walker.getMessageCount(), options.getInput().getAbsolutePath()); - } else { - long start = System.currentTimeMillis(); - long cnt = processor.parseMboxLineByLine(options.getInput()); - long finish = System.currentTimeMillis(); - log.info("Parsed {} messages from {} in time: {}", cnt, options.getInput().getAbsolutePath(), finish - start); - } - } - } - - private static class PrefixAdditionDirectoryWalker extends DirectoryWalker<Object> { - - @SuppressWarnings("unchecked") - private static final Comparator<File> FILE_COMPARATOR = new CompositeFileComparator( - DirectoryFileComparator.DIRECTORY_REVERSE, PathFileComparator.PATH_COMPARATOR); - - private final Deque<MailProcessor> processors = new ArrayDeque<>(); - private final ChunkedWriter writer; - private final Deque<Long> messageCounts = new ArrayDeque<>(); - - public PrefixAdditionDirectoryWalker(MailProcessor processor, ChunkedWriter writer) { - processors.addFirst(processor); - this.writer = writer; - messageCounts.addFirst(0L); - } - - public void walk(File startDirectory) throws IOException { - super.walk(startDirectory, null); - } - - public long getMessageCount() { - return messageCounts.getFirst(); - } - - @Override - protected void handleDirectoryStart(File current, int depth, Collection<Object> results) throws IOException { - if (depth > 0) { - log.info("At {}", current.getAbsolutePath()); - MailProcessor processor = processors.getFirst(); - MailProcessor subDirProcessor = new MailProcessor(processor.getOptions(), processor.getPrefix() - + File.separator + current.getName(), writer); - processors.push(subDirProcessor); - messageCounts.push(0L); - } - } - - @Override - protected File[] filterDirectoryContents(File directory, int depth, File[] files) throws IOException { - Arrays.sort(files, FILE_COMPARATOR); - return files; - } - - @Override - protected void handleFile(File current, int depth, Collection<Object> results) throws IOException { - MailProcessor processor = processors.getFirst(); - long currentDirMessageCount = messageCounts.pop(); - try { - currentDirMessageCount += processor.parseMboxLineByLine(current); - } catch (IOException e) { - throw new IllegalStateException("Error processing " + current, e); - } - messageCounts.push(currentDirMessageCount); - } - - @Override - protected void handleDirectoryEnd(File current, int depth, Collection<Object> results) throws IOException { - if (depth > 0) { - final long currentDirMessageCount = messageCounts.pop(); - log.info("Parsed {} messages from directory {}", currentDirMessageCount, current.getAbsolutePath()); - - processors.pop(); - - // aggregate message counts - long parentDirMessageCount = messageCounts.pop(); - parentDirMessageCount += currentDirMessageCount; - messageCounts.push(parentDirMessageCount); - } - } - } - - public static void main(String[] args) throws Exception { - ToolRunner.run(new Configuration(), new SequenceFilesFromMailArchives(), args); - } - - @Override - public int run(String[] args) throws Exception { - addInputOption(); - addOutputOption(); - addOption(DefaultOptionCreator.methodOption().create()); - - addOption(CHUNK_SIZE_OPTION[0], CHUNK_SIZE_OPTION[1], "The chunkSize in MegaBytes. Defaults to 64", "64"); - addOption(KEY_PREFIX_OPTION[0], KEY_PREFIX_OPTION[1], "The prefix to be prepended to the key", ""); - addOption(CHARSET_OPTION[0], CHARSET_OPTION[1], - "The name of the character encoding of the input files. Default to UTF-8", "UTF-8"); - addFlag(SUBJECT_OPTION[0], SUBJECT_OPTION[1], "Include the Mail subject as part of the text. Default is false"); - addFlag(TO_OPTION[0], TO_OPTION[1], "Include the to field in the text. Default is false"); - addFlag(FROM_OPTION[0], FROM_OPTION[1], "Include the from field in the text. Default is false"); - addFlag(REFERENCES_OPTION[0], REFERENCES_OPTION[1], - "Include the references field in the text. Default is false"); - addFlag(BODY_OPTION[0], BODY_OPTION[1], "Include the body in the output. Default is false"); - addFlag(STRIP_QUOTED_OPTION[0], STRIP_QUOTED_OPTION[1], - "Strip (remove) quoted email text in the body. Default is false"); - addOption(QUOTED_REGEX_OPTION[0], QUOTED_REGEX_OPTION[1], - "Specify the regex that identifies quoted text. " - + "Default is to look for > or | at the beginning of the line."); - addOption(SEPARATOR_OPTION[0], SEPARATOR_OPTION[1], - "The separator to use between metadata items (to, from, etc.). Default is \\n", "\n"); - addOption(BODY_SEPARATOR_OPTION[0], BODY_SEPARATOR_OPTION[1], - "The separator to use between lines in the body. Default is \\n. " - + "Useful to change if you wish to have the message be on one line", "\n"); - - addOption(DefaultOptionCreator.helpOption()); - Map<String, List<String>> parsedArgs = parseArguments(args); - if (parsedArgs == null) { - return -1; - } - File input = getInputFile(); - String outputDir = getOutputPath().toString(); - - int chunkSize = 64; - if (hasOption(CHUNK_SIZE_OPTION[0])) { - chunkSize = Integer.parseInt(getOption(CHUNK_SIZE_OPTION[0])); - } - - String prefix = ""; - if (hasOption(KEY_PREFIX_OPTION[0])) { - prefix = getOption(KEY_PREFIX_OPTION[0]); - } - - Charset charset = Charset.forName(getOption(CHARSET_OPTION[0])); - MailOptions options = new MailOptions(); - options.setInput(input); - options.setOutputDir(outputDir); - options.setPrefix(prefix); - options.setChunkSize(chunkSize); - options.setCharset(charset); - - List<Pattern> patterns = new ArrayList<>(5); - // patternOrder is used downstream so that we can know what order the text - // is in instead of encoding it in the string, which - // would require more processing later to remove it pre feature selection. - Map<String, Integer> patternOrder = new HashMap<>(); - int order = 0; - if (hasOption(FROM_OPTION[0])) { - patterns.add(MailProcessor.FROM_PREFIX); - patternOrder.put(MailOptions.FROM, order++); - } - if (hasOption(TO_OPTION[0])) { - patterns.add(MailProcessor.TO_PREFIX); - patternOrder.put(MailOptions.TO, order++); - } - if (hasOption(REFERENCES_OPTION[0])) { - patterns.add(MailProcessor.REFS_PREFIX); - patternOrder.put(MailOptions.REFS, order++); - } - if (hasOption(SUBJECT_OPTION[0])) { - patterns.add(MailProcessor.SUBJECT_PREFIX); - patternOrder.put(MailOptions.SUBJECT, order += 1); - } - options.setStripQuotedText(hasOption(STRIP_QUOTED_OPTION[0])); - - options.setPatternsToMatch(patterns.toArray(new Pattern[patterns.size()])); - options.setPatternOrder(patternOrder); - options.setIncludeBody(hasOption(BODY_OPTION[0])); - - if (hasOption(SEPARATOR_OPTION[0])) { - options.setSeparator(getOption(SEPARATOR_OPTION[0])); - } else { - options.setSeparator("\n"); - } - - if (hasOption(BODY_SEPARATOR_OPTION[0])) { - options.setBodySeparator(getOption(BODY_SEPARATOR_OPTION[0])); - } - - if (hasOption(QUOTED_REGEX_OPTION[0])) { - options.setQuotedTextPattern(Pattern.compile(getOption(QUOTED_REGEX_OPTION[0]))); - } - - if (getOption(DefaultOptionCreator.METHOD_OPTION, - DefaultOptionCreator.MAPREDUCE_METHOD).equals(DefaultOptionCreator.SEQUENTIAL_METHOD)) { - runSequential(options); - } else { - runMapReduce(getInputPath(), getOutputPath()); - } - - return 0; - } - - private int runSequential(MailOptions options) - throws IOException, InterruptedException, NoSuchMethodException { - - long start = System.currentTimeMillis(); - createSequenceFiles(options); - long finish = System.currentTimeMillis(); - log.info("Conversion took {}ms", finish - start); - - return 0; - } - - private int runMapReduce(Path input, Path output) throws IOException, InterruptedException, ClassNotFoundException { - - Job job = prepareJob(input, output, MultipleTextFileInputFormat.class, SequenceFilesFromMailArchivesMapper.class, - Text.class, Text.class, SequenceFileOutputFormat.class, "SequentialFilesFromMailArchives"); - - Configuration jobConfig = job.getConfiguration(); - - if (hasOption(KEY_PREFIX_OPTION[0])) { - jobConfig.set(KEY_PREFIX_OPTION[1], getOption(KEY_PREFIX_OPTION[0])); - } - - int chunkSize = 0; - if (hasOption(CHUNK_SIZE_OPTION[0])) { - chunkSize = Integer.parseInt(getOption(CHUNK_SIZE_OPTION[0])); - jobConfig.set(CHUNK_SIZE_OPTION[0], String.valueOf(chunkSize)); - } - - Charset charset; - if (hasOption(CHARSET_OPTION[0])) { - charset = Charset.forName(getOption(CHARSET_OPTION[0])); - jobConfig.set(CHARSET_OPTION[0], charset.displayName()); - } - - if (hasOption(FROM_OPTION[0])) { - jobConfig.set(FROM_OPTION[1], "true"); - } - - if (hasOption(TO_OPTION[0])) { - jobConfig.set(TO_OPTION[1], "true"); - } - - if (hasOption(REFERENCES_OPTION[0])) { - jobConfig.set(REFERENCES_OPTION[1], "true"); - } - - if (hasOption(SUBJECT_OPTION[0])) { - jobConfig.set(SUBJECT_OPTION[1], "true"); - } - - if (hasOption(QUOTED_REGEX_OPTION[0])) { - jobConfig.set(QUOTED_REGEX_OPTION[1], Pattern.compile(getOption(QUOTED_REGEX_OPTION[0])).toString()); - } - - if (hasOption(SEPARATOR_OPTION[0])) { - jobConfig.set(SEPARATOR_OPTION[1], getOption(SEPARATOR_OPTION[0])); - } else { - jobConfig.set(SEPARATOR_OPTION[1], "\n"); - } - - if (hasOption(BODY_OPTION[0])) { - jobConfig.set(BODY_OPTION[1], "true"); - } else { - jobConfig.set(BODY_OPTION[1], "false"); - } - - if (hasOption(BODY_SEPARATOR_OPTION[0])) { - jobConfig.set(BODY_SEPARATOR_OPTION[1], getOption(BODY_SEPARATOR_OPTION[0])); - } else { - jobConfig.set(BODY_SEPARATOR_OPTION[1], "\n"); - } - - FileSystem fs = FileSystem.get(jobConfig); - FileStatus fsFileStatus = fs.getFileStatus(inputPath); - - jobConfig.set(BASE_INPUT_PATH, inputPath.toString()); - String inputDirList = HadoopUtil.buildDirList(fs, fsFileStatus); - FileInputFormat.setInputPaths(job, inputDirList); - - long chunkSizeInBytes = chunkSize * 1024 * 1024; - // need to set this to a multiple of the block size, or no split happens - FileInputFormat.setMaxInputSplitSize(job, chunkSizeInBytes); - - // set the max split locations, otherwise we get nasty debug stuff - jobConfig.set("mapreduce.job.max.split.locations", String.valueOf(MAX_JOB_SPLIT_LOCATIONS)); - - boolean succeeded = job.waitForCompletion(true); - if (!succeeded) { - return -1; - } - return 0; - } -} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchivesMapper.java ---------------------------------------------------------------------- diff --git a/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchivesMapper.java b/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchivesMapper.java deleted file mode 100644 index 203e8fb..0000000 --- a/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchivesMapper.java +++ /dev/null @@ -1,244 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.text; - -import com.google.common.base.Joiner; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; -import org.apache.mahout.common.HadoopUtil; -import org.apache.mahout.common.iterator.FileLineIterable; -import org.apache.mahout.utils.email.MailOptions; -import org.apache.mahout.utils.email.MailProcessor; - -import java.io.ByteArrayInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStream; -import java.nio.charset.Charset; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import static org.apache.mahout.text.SequenceFilesFromMailArchives.BODY_OPTION; -import static org.apache.mahout.text.SequenceFilesFromMailArchives.BODY_SEPARATOR_OPTION; -import static org.apache.mahout.text.SequenceFilesFromMailArchives.CHARSET_OPTION; -import static org.apache.mahout.text.SequenceFilesFromMailArchives.CHUNK_SIZE_OPTION; -import static org.apache.mahout.text.SequenceFilesFromMailArchives.FROM_OPTION; -import static org.apache.mahout.text.SequenceFilesFromMailArchives.KEY_PREFIX_OPTION; -import static org.apache.mahout.text.SequenceFilesFromMailArchives.QUOTED_REGEX_OPTION; -import static org.apache.mahout.text.SequenceFilesFromMailArchives.REFERENCES_OPTION; -import static org.apache.mahout.text.SequenceFilesFromMailArchives.SEPARATOR_OPTION; -import static org.apache.mahout.text.SequenceFilesFromMailArchives.STRIP_QUOTED_OPTION; -import static org.apache.mahout.text.SequenceFilesFromMailArchives.SUBJECT_OPTION; -import static org.apache.mahout.text.SequenceFilesFromMailArchives.TO_OPTION; - -/** - * Map Class for the SequenceFilesFromMailArchives job - */ -public class SequenceFilesFromMailArchivesMapper extends Mapper<IntWritable, BytesWritable, Text, Text> { - - private Text outKey = new Text(); - private Text outValue = new Text(); - - private static final Pattern MESSAGE_START = Pattern.compile( - "^From \\S+@\\S.*\\d{4}$", Pattern.CASE_INSENSITIVE); - private static final Pattern MESSAGE_ID_PREFIX = Pattern.compile( - "^message-id: <(.*)>$", Pattern.CASE_INSENSITIVE); - - private MailOptions options; - - @Override - public void setup(Context context) throws IOException, InterruptedException { - - Configuration configuration = context.getConfiguration(); - - // absorb all of the options into the MailOptions object - this.options = new MailOptions(); - - options.setPrefix(configuration.get(KEY_PREFIX_OPTION[1], "")); - - if (!configuration.get(CHUNK_SIZE_OPTION[0], "").equals("")) { - options.setChunkSize(configuration.getInt(CHUNK_SIZE_OPTION[0], 64)); - } - - if (!configuration.get(CHARSET_OPTION[0], "").equals("")) { - Charset charset = Charset.forName(configuration.get(CHARSET_OPTION[0], "UTF-8")); - options.setCharset(charset); - } else { - Charset charset = Charset.forName("UTF-8"); - options.setCharset(charset); - } - - List<Pattern> patterns = Lists.newArrayListWithCapacity(5); - // patternOrder is used downstream so that we can know what order the - // text is in instead - // of encoding it in the string, which - // would require more processing later to remove it pre feature - // selection. - Map<String, Integer> patternOrder = Maps.newHashMap(); - int order = 0; - if (!configuration.get(FROM_OPTION[1], "").equals("")) { - patterns.add(MailProcessor.FROM_PREFIX); - patternOrder.put(MailOptions.FROM, order++); - } - - if (!configuration.get(TO_OPTION[1], "").equals("")) { - patterns.add(MailProcessor.TO_PREFIX); - patternOrder.put(MailOptions.TO, order++); - } - - if (!configuration.get(REFERENCES_OPTION[1], "").equals("")) { - patterns.add(MailProcessor.REFS_PREFIX); - patternOrder.put(MailOptions.REFS, order++); - } - - if (!configuration.get(SUBJECT_OPTION[1], "").equals("")) { - patterns.add(MailProcessor.SUBJECT_PREFIX); - patternOrder.put(MailOptions.SUBJECT, order += 1); - } - - options.setStripQuotedText(configuration.getBoolean(STRIP_QUOTED_OPTION[1], false)); - - options.setPatternsToMatch(patterns.toArray(new Pattern[patterns.size()])); - options.setPatternOrder(patternOrder); - - options.setIncludeBody(configuration.getBoolean(BODY_OPTION[1], false)); - - options.setSeparator("\n"); - if (!configuration.get(SEPARATOR_OPTION[1], "").equals("")) { - options.setSeparator(configuration.get(SEPARATOR_OPTION[1], "")); - } - if (!configuration.get(BODY_SEPARATOR_OPTION[1], "").equals("")) { - options.setBodySeparator(configuration.get(BODY_SEPARATOR_OPTION[1], "")); - } - if (!configuration.get(QUOTED_REGEX_OPTION[1], "").equals("")) { - options.setQuotedTextPattern(Pattern.compile(configuration.get(QUOTED_REGEX_OPTION[1], ""))); - } - - } - - public long parseMailboxLineByLine(String filename, InputStream mailBoxInputStream, Context context) - throws IOException, InterruptedException { - long messageCount = 0; - try { - StringBuilder contents = new StringBuilder(); - StringBuilder body = new StringBuilder(); - Matcher messageIdMatcher = MESSAGE_ID_PREFIX.matcher(""); - Matcher messageBoundaryMatcher = MESSAGE_START.matcher(""); - String[] patternResults = new String[options.getPatternsToMatch().length]; - Matcher[] matches = new Matcher[options.getPatternsToMatch().length]; - for (int i = 0; i < matches.length; i++) { - matches[i] = options.getPatternsToMatch()[i].matcher(""); - } - - String messageId = null; - boolean inBody = false; - Pattern quotedTextPattern = options.getQuotedTextPattern(); - - for (String nextLine : new FileLineIterable(mailBoxInputStream, options.getCharset(), false, filename)) { - if (!options.isStripQuotedText() || !quotedTextPattern.matcher(nextLine).find()) { - for (int i = 0; i < matches.length; i++) { - Matcher matcher = matches[i]; - matcher.reset(nextLine); - if (matcher.matches()) { - patternResults[i] = matcher.group(1); - } - } - - // only start appending body content after we've seen a message ID - if (messageId != null) { - // first, see if we hit the end of the message - messageBoundaryMatcher.reset(nextLine); - if (messageBoundaryMatcher.matches()) { - // done parsing this message ... write it out - String key = generateKey(filename, options.getPrefix(), messageId); - // if this ordering changes, then also change - // FromEmailToDictionaryMapper - writeContent(options.getSeparator(), contents, body, patternResults); - - this.outKey.set(key); - this.outValue.set(contents.toString()); - context.write(this.outKey, this.outValue); - contents.setLength(0); // reset the buffer - body.setLength(0); - messageId = null; - inBody = false; - } else { - if (inBody && options.isIncludeBody()) { - if (!nextLine.isEmpty()) { - body.append(nextLine).append(options.getBodySeparator()); - } - } else { - // first empty line we see after reading the message Id - // indicates that we are in the body ... - inBody = nextLine.isEmpty(); - } - } - } else { - if (nextLine.length() > 14) { - messageIdMatcher.reset(nextLine); - if (messageIdMatcher.matches()) { - messageId = messageIdMatcher.group(1); - ++messageCount; - } - } - } - } - } - // write the last message in the file if available - if (messageId != null) { - String key = generateKey(filename, options.getPrefix(), messageId); - writeContent(options.getSeparator(), contents, body, patternResults); - this.outKey.set(key); - this.outValue.set(contents.toString()); - context.write(this.outKey, this.outValue); - contents.setLength(0); // reset the buffer - } - } catch (FileNotFoundException ignored) { - - } - return messageCount; - } - - protected static String generateKey(String mboxFilename, String prefix, String messageId) { - return Joiner.on(Path.SEPARATOR).join(Lists.newArrayList(prefix, mboxFilename, messageId).iterator()); - } - - private static void writeContent(String separator, StringBuilder contents, CharSequence body, String[] matches) { - String matchesString = Joiner.on(separator).useForNull("").join(Arrays.asList(matches).iterator()); - contents.append(matchesString).append(separator).append(body); - } - - public void map(IntWritable key, BytesWritable value, Context context) - throws IOException, InterruptedException { - Configuration configuration = context.getConfiguration(); - Path filePath = ((CombineFileSplit) context.getInputSplit()).getPath(key.get()); - String relativeFilePath = HadoopUtil.calcRelativeFilePath(configuration, filePath); - ByteArrayInputStream is = new ByteArrayInputStream(value.getBytes()); - parseMailboxLineByLine(relativeFilePath, is, context); - } -} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/integration/src/main/java/org/apache/mahout/text/TextParagraphSplittingJob.java ---------------------------------------------------------------------- diff --git a/integration/src/main/java/org/apache/mahout/text/TextParagraphSplittingJob.java b/integration/src/main/java/org/apache/mahout/text/TextParagraphSplittingJob.java deleted file mode 100644 index cacfd22..0000000 --- a/integration/src/main/java/org/apache/mahout/text/TextParagraphSplittingJob.java +++ /dev/null @@ -1,73 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.text; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; -import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; -import org.apache.hadoop.util.ToolRunner; -import org.apache.mahout.common.AbstractJob; - -import java.io.IOException; - -public class TextParagraphSplittingJob extends AbstractJob { - - @Override - public int run(String[] strings) throws Exception { - Configuration originalConf = getConf(); - Job job = prepareJob(new Path(originalConf.get("mapred.input.dir")), - new Path(originalConf.get("mapred.output.dir")), - SequenceFileInputFormat.class, - SplitMap.class, - Text.class, - Text.class, - Reducer.class, - Text.class, - Text.class, - SequenceFileOutputFormat.class); - job.setNumReduceTasks(0); - boolean succeeded = job.waitForCompletion(true); - return succeeded ? 0 : -1; - } - - public static class SplitMap extends Mapper<Text,Text,Text,Text> { - - @Override - protected void map(Text key, Text text, Context context) throws IOException, InterruptedException { - Text outText = new Text(); - int loc = 0; - while (loc >= 0 && loc < text.getLength()) { - int nextLoc = text.find("\n\n", loc + 1); - if (nextLoc > 0) { - outText.set(text.getBytes(), loc, nextLoc - loc); - context.write(key, outText); - } - loc = nextLoc; - } - } - } - - public static void main(String[] args) throws Exception { - ToolRunner.run(new TextParagraphSplittingJob(), args); - } -} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/integration/src/main/java/org/apache/mahout/text/WholeFileRecordReader.java ---------------------------------------------------------------------- diff --git a/integration/src/main/java/org/apache/mahout/text/WholeFileRecordReader.java b/integration/src/main/java/org/apache/mahout/text/WholeFileRecordReader.java deleted file mode 100644 index b8441b7..0000000 --- a/integration/src/main/java/org/apache/mahout/text/WholeFileRecordReader.java +++ /dev/null @@ -1,125 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.text; - -import java.io.IOException; - -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; -import org.apache.hadoop.mapreduce.lib.input.FileSplit; - -import static org.apache.mahout.text.SequenceFilesFromDirectory.FILE_FILTER_CLASS_OPTION; - -/** - * RecordReader used with the MultipleTextFileInputFormat class to read full files as - * k/v pairs and groups of files as single input splits. - */ -public class WholeFileRecordReader extends RecordReader<IntWritable, BytesWritable> { - - private FileSplit fileSplit; - private boolean processed = false; - private Configuration configuration; - private BytesWritable value = new BytesWritable(); - private IntWritable index; - private String fileFilterClassName = null; - private PathFilter pathFilter = null; - - public WholeFileRecordReader(CombineFileSplit fileSplit, TaskAttemptContext taskAttemptContext, Integer idx) - throws IOException { - this.fileSplit = new FileSplit(fileSplit.getPath(idx), fileSplit.getOffset(idx), - fileSplit.getLength(idx), fileSplit.getLocations()); - this.configuration = taskAttemptContext.getConfiguration(); - this.index = new IntWritable(idx); - this.fileFilterClassName = this.configuration.get(FILE_FILTER_CLASS_OPTION[0]); - } - - @Override - public IntWritable getCurrentKey() { - return index; - } - - @Override - public BytesWritable getCurrentValue() { - return value; - } - - @Override - public float getProgress() throws IOException { - return processed ? 1.0f : 0.0f; - } - - @Override - public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) - throws IOException, InterruptedException { - if (!StringUtils.isBlank(fileFilterClassName) && - !PrefixAdditionFilter.class.getName().equals(fileFilterClassName)) { - try { - pathFilter = (PathFilter) Class.forName(fileFilterClassName).newInstance(); - } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) { - throw new IllegalStateException(e); - } - } - } - - @Override - public boolean nextKeyValue() throws IOException { - if (!processed) { - byte[] contents = new byte[(int) fileSplit.getLength()]; - Path file = fileSplit.getPath(); - FileSystem fs = file.getFileSystem(this.configuration); - - if (!fs.isFile(file)) { - return false; - } - - FileStatus[] fileStatuses; - if (pathFilter != null) { - fileStatuses = fs.listStatus(file, pathFilter); - } else { - fileStatuses = fs.listStatus(file); - } - - if (fileStatuses.length == 1) { - try (FSDataInputStream in = fs.open(fileStatuses[0].getPath())) { - IOUtils.readFully(in, contents, 0, contents.length); - value.setCapacity(contents.length); - value.set(contents, 0, contents.length); - } - processed = true; - return true; - } - } - return false; - } - - @Override - public void close() throws IOException { - } -} \ No newline at end of file
