http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/integration/src/main/java/org/apache/mahout/text/MailArchivesClusteringAnalyzer.java ---------------------------------------------------------------------- diff --git a/integration/src/main/java/org/apache/mahout/text/MailArchivesClusteringAnalyzer.java b/integration/src/main/java/org/apache/mahout/text/MailArchivesClusteringAnalyzer.java deleted file mode 100644 index 12ed471..0000000 --- a/integration/src/main/java/org/apache/mahout/text/MailArchivesClusteringAnalyzer.java +++ /dev/null @@ -1,164 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.mahout.text; - -import org.apache.lucene.analysis.TokenFilter; -import org.apache.lucene.analysis.TokenStream; -import org.apache.lucene.analysis.Tokenizer; -import org.apache.lucene.analysis.core.LowerCaseFilter; -import org.apache.lucene.analysis.core.StopFilter; -import org.apache.lucene.analysis.en.PorterStemFilter; -import org.apache.lucene.analysis.miscellaneous.ASCIIFoldingFilter; -import org.apache.lucene.analysis.standard.StandardFilter; -import org.apache.lucene.analysis.standard.StandardTokenizer; -import org.apache.lucene.analysis.tokenattributes.CharTermAttribute; -import org.apache.lucene.analysis.util.CharArraySet; -import org.apache.lucene.analysis.util.StopwordAnalyzerBase; - -import java.io.IOException; -import java.util.Arrays; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -/** - * Custom Lucene Analyzer designed for aggressive feature reduction - * for clustering the ASF Mail Archives using an extended set of - * stop words, excluding non-alpha-numeric tokens, and porter stemming. - */ -public final class MailArchivesClusteringAnalyzer extends StopwordAnalyzerBase { - // extended set of stop words composed of common mail terms like "hi", - // HTML tags, and Java keywords asmany of the messages in the archives - // are subversion check-in notifications - - private static final CharArraySet STOP_SET = new CharArraySet(Arrays.asList( - "3d","7bit","a0","about","above","abstract","across","additional","after", - "afterwards","again","against","align","all","almost","alone","along", - "already","also","although","always","am","among","amongst","amoungst", - "amount","an","and","another","any","anybody","anyhow","anyone","anything", - "anyway","anywhere","are","arial","around","as","ascii","assert","at", - "back","background","base64","bcc","be","became","because","become","becomes", - "becoming","been","before","beforehand","behind","being","below","beside", - "besides","between","beyond","bgcolor","blank","blockquote","body","boolean", - "border","both","br","break","but","by","can","cannot","cant","case","catch", - "cc","cellpadding","cellspacing","center","char","charset","cheers","class", - "co","color","colspan","com","con","const","continue","could","couldnt", - "cry","css","de","dear","default","did","didnt","different","div","do", - "does","doesnt","done","dont","double","down","due","during","each","eg", - "eight","either","else","elsewhere","empty","encoding","enough","enum", - "etc","eu","even","ever","every","everyone","everything","everywhere", - "except","extends","face","family","few","ffffff","final","finally","float", - "font","for","former","formerly","fri","from","further","get","give","go", - "good","got","goto","gt","h1","ha","had","has","hasnt","have","he","head", - "height","hello","helvetica","hence","her","here","hereafter","hereby", - "herein","hereupon","hers","herself","hi","him","himself","his","how", - "however","hr","href","html","http","https","id","ie","if","ill","im", - "image","img","implements","import","in","inc","instanceof","int","interface", - "into","is","isnt","iso-8859-1","it","its","itself","ive","just","keep", - "last","latter","latterly","least","left","less","li","like","long","look", - "lt","ltd","mail","mailto","many","margin","may","me","meanwhile","message", - "meta","might","mill","mine","mon","more","moreover","most","mostly","mshtml", - "mso","much","must","my","myself","name","namely","native","nbsp","need", - "neither","never","nevertheless","new","next","nine","no","nobody","none", - "noone","nor","not","nothing","now","nowhere","null","of","off","often", - "ok","on","once","only","onto","or","org","other","others","otherwise", - "our","ours","ourselves","out","over","own","package","pad","per","perhaps", - "plain","please","pm","printable","private","protected","public","put", - "quot","quote","r1","r2","rather","re","really","regards","reply","return", - "right","said","same","sans","sat","say","saying","see","seem","seemed", - "seeming","seems","serif","serious","several","she","short","should","show", - "side","since","sincere","six","sixty","size","so","solid","some","somehow", - "someone","something","sometime","sometimes","somewhere","span","src", - "static","still","strictfp","string","strong","style","stylesheet","subject", - "such","sun","super","sure","switch","synchronized","table","take","target", - "td","text","th","than","thanks","that","the","their","them","themselves", - "then","thence","there","thereafter","thereby","therefore","therein","thereupon", - "these","they","thick","thin","think","third","this","those","though", - "three","through","throughout","throw","throws","thru","thu","thus","tm", - "to","together","too","top","toward","towards","tr","transfer","transient", - "try","tue","type","ul","un","under","unsubscribe","until","up","upon", - "us","use","used","uses","using","valign","verdana","very","via","void", - "volatile","want","was","we","wed","weight","well","were","what","whatever", - "when","whence","whenever","where","whereafter","whereas","whereby","wherein", - "whereupon","wherever","whether","which","while","whither","who","whoever", - "whole","whom","whose","why","width","will","with","within","without", - "wont","would","wrote","www","yes","yet","you","your","yours","yourself", - "yourselves" - ), false); - - // Regex used to exclude non-alpha-numeric tokens - private static final Pattern ALPHA_NUMERIC = Pattern.compile("^[a-z][a-z0-9_]+$"); - private static final Matcher MATCHER = ALPHA_NUMERIC.matcher(""); - - public MailArchivesClusteringAnalyzer() { - super(STOP_SET); - } - - public MailArchivesClusteringAnalyzer(CharArraySet stopSet) { - super(stopSet); - } - - @Override - protected TokenStreamComponents createComponents(String fieldName) { - Tokenizer tokenizer = new StandardTokenizer(); - TokenStream result = new StandardFilter(tokenizer); - result = new LowerCaseFilter(result); - result = new ASCIIFoldingFilter(result); - result = new AlphaNumericMaxLengthFilter(result); - result = new StopFilter(result, STOP_SET); - result = new PorterStemFilter(result); - return new TokenStreamComponents(tokenizer, result); - } - - /** - * Matches alpha-numeric tokens between 2 and 40 chars long. - */ - static class AlphaNumericMaxLengthFilter extends TokenFilter { - private final CharTermAttribute termAtt; - private final char[] output = new char[28]; - - AlphaNumericMaxLengthFilter(TokenStream in) { - super(in); - termAtt = addAttribute(CharTermAttribute.class); - } - - @Override - public final boolean incrementToken() throws IOException { - // return the first alpha-numeric token between 2 and 40 length - while (input.incrementToken()) { - int length = termAtt.length(); - if (length >= 2 && length <= 28) { - char[] buf = termAtt.buffer(); - int at = 0; - for (int c = 0; c < length; c++) { - char ch = buf[c]; - if (ch != '\'') { - output[at++] = ch; - } - } - String term = new String(output, 0, at); - MATCHER.reset(term); - if (MATCHER.matches() && !term.startsWith("a0")) { - termAtt.setEmpty(); - termAtt.append(term); - return true; - } - } - } - return false; - } - } -}
http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/integration/src/main/java/org/apache/mahout/text/MultipleTextFileInputFormat.java ---------------------------------------------------------------------- diff --git a/integration/src/main/java/org/apache/mahout/text/MultipleTextFileInputFormat.java b/integration/src/main/java/org/apache/mahout/text/MultipleTextFileInputFormat.java deleted file mode 100644 index 44df006..0000000 --- a/integration/src/main/java/org/apache/mahout/text/MultipleTextFileInputFormat.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.text; - -import java.io.IOException; - -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat; -import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader; -import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; - -/** - * - * Used in combining a large number of text files into one text input reader - * along with the WholeFileRecordReader class. - * - */ -public class MultipleTextFileInputFormat extends CombineFileInputFormat<IntWritable, BytesWritable> { - - @Override - public RecordReader<IntWritable, BytesWritable> createRecordReader(InputSplit inputSplit, - TaskAttemptContext taskAttemptContext) - throws IOException { - return new CombineFileRecordReader<>((CombineFileSplit) inputSplit, - taskAttemptContext, WholeFileRecordReader.class); - } -} http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/integration/src/main/java/org/apache/mahout/text/PrefixAdditionFilter.java ---------------------------------------------------------------------- diff --git a/integration/src/main/java/org/apache/mahout/text/PrefixAdditionFilter.java b/integration/src/main/java/org/apache/mahout/text/PrefixAdditionFilter.java deleted file mode 100644 index 37ebc44..0000000 --- a/integration/src/main/java/org/apache/mahout/text/PrefixAdditionFilter.java +++ /dev/null @@ -1,67 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.text; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.mahout.common.iterator.FileLineIterable; -import org.apache.mahout.utils.io.ChunkedWriter; - -import java.io.IOException; -import java.io.InputStream; -import java.nio.charset.Charset; -import java.util.Map; - -/** - * Default parser for parsing text into sequence files. - */ -public final class PrefixAdditionFilter extends SequenceFilesFromDirectoryFilter { - - public PrefixAdditionFilter(Configuration conf, - String keyPrefix, - Map<String, String> options, - ChunkedWriter writer, - Charset charset, - FileSystem fs) { - super(conf, keyPrefix, options, writer, charset, fs); - } - - @Override - protected void process(FileStatus fst, Path current) throws IOException { - FileSystem fs = getFs(); - ChunkedWriter writer = getWriter(); - if (fst.isDir()) { - String dirPath = getPrefix() + Path.SEPARATOR + current.getName() + Path.SEPARATOR + fst.getPath().getName(); - fs.listStatus(fst.getPath(), - new PrefixAdditionFilter(getConf(), dirPath, getOptions(), writer, getCharset(), fs)); - } else { - try (InputStream in = fs.open(fst.getPath())){ - StringBuilder file = new StringBuilder(); - for (String aFit : new FileLineIterable(in, getCharset(), false)) { - file.append(aFit).append('\n'); - } - String name = current.getName().equals(fst.getPath().getName()) - ? current.getName() - : current.getName() + Path.SEPARATOR + fst.getPath().getName(); - writer.write(getPrefix() + Path.SEPARATOR + name, file.toString()); - } - } - } -} http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java ---------------------------------------------------------------------- diff --git a/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java b/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java deleted file mode 100644 index 311ab8d..0000000 --- a/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java +++ /dev/null @@ -1,214 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.text; - -import java.io.IOException; -import java.nio.charset.Charset; -import java.util.HashMap; -import java.util.Map; - -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; -import org.apache.hadoop.util.ToolRunner; -import org.apache.mahout.common.AbstractJob; -import org.apache.mahout.common.ClassUtils; -import org.apache.mahout.common.HadoopUtil; -import org.apache.mahout.common.commandline.DefaultOptionCreator; -import org.apache.mahout.utils.io.ChunkedWriter; - -/** - * Converts a directory of text documents into SequenceFiles of Specified chunkSize. This class takes in a - * parent directory containing sub folders of text documents and recursively reads the files and creates the - * {@link org.apache.hadoop.io.SequenceFile}s of docid => content. The docid is set as the relative path of the - * document from the parent directory prepended with a specified prefix. You can also specify the input encoding - * of the text files. The content of the output SequenceFiles are encoded as UTF-8 text. - */ -public class SequenceFilesFromDirectory extends AbstractJob { - - private static final String PREFIX_ADDITION_FILTER = PrefixAdditionFilter.class.getName(); - - private static final String[] CHUNK_SIZE_OPTION = {"chunkSize", "chunk"}; - public static final String[] FILE_FILTER_CLASS_OPTION = {"fileFilterClass", "filter"}; - private static final String[] CHARSET_OPTION = {"charset", "c"}; - - private static final int MAX_JOB_SPLIT_LOCATIONS = 1000000; - - public static final String[] KEY_PREFIX_OPTION = {"keyPrefix", "prefix"}; - public static final String BASE_INPUT_PATH = "baseinputpath"; - - public static void main(String[] args) throws Exception { - ToolRunner.run(new SequenceFilesFromDirectory(), args); - } - - /* - * callback main after processing MapReduce parameters - */ - @Override - public int run(String[] args) throws Exception { - addOptions(); - addOption(DefaultOptionCreator.methodOption().create()); - addOption(DefaultOptionCreator.overwriteOption().create()); - - if (parseArguments(args) == null) { - return -1; - } - - Map<String, String> options = parseOptions(); - Path output = getOutputPath(); - if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) { - HadoopUtil.delete(getConf(), output); - } - - if (getOption(DefaultOptionCreator.METHOD_OPTION, - DefaultOptionCreator.MAPREDUCE_METHOD).equals(DefaultOptionCreator.SEQUENTIAL_METHOD)) { - runSequential(getConf(), getInputPath(), output, options); - } else { - runMapReduce(getInputPath(), output); - } - - return 0; - } - - private int runSequential(Configuration conf, Path input, Path output, Map<String, String> options) - throws IOException, InterruptedException, NoSuchMethodException { - // Running sequentially - Charset charset = Charset.forName(getOption(CHARSET_OPTION[0])); - String keyPrefix = getOption(KEY_PREFIX_OPTION[0]); - FileSystem fs = FileSystem.get(input.toUri(), conf); - - try (ChunkedWriter writer = new ChunkedWriter(conf, Integer.parseInt(options.get(CHUNK_SIZE_OPTION[0])), output)) { - SequenceFilesFromDirectoryFilter pathFilter; - String fileFilterClassName = options.get(FILE_FILTER_CLASS_OPTION[0]); - if (PrefixAdditionFilter.class.getName().equals(fileFilterClassName)) { - pathFilter = new PrefixAdditionFilter(conf, keyPrefix, options, writer, charset, fs); - } else { - pathFilter = ClassUtils.instantiateAs(fileFilterClassName, SequenceFilesFromDirectoryFilter.class, - new Class[] {Configuration.class, String.class, Map.class, ChunkedWriter.class, Charset.class, FileSystem.class}, - new Object[] {conf, keyPrefix, options, writer, charset, fs}); - } - fs.listStatus(input, pathFilter); - } - return 0; - } - - private int runMapReduce(Path input, Path output) throws IOException, ClassNotFoundException, InterruptedException { - - int chunkSizeInMB = 64; - if (hasOption(CHUNK_SIZE_OPTION[0])) { - chunkSizeInMB = Integer.parseInt(getOption(CHUNK_SIZE_OPTION[0])); - } - - String keyPrefix = null; - if (hasOption(KEY_PREFIX_OPTION[0])) { - keyPrefix = getOption(KEY_PREFIX_OPTION[0]); - } - - String fileFilterClassName = null; - if (hasOption(FILE_FILTER_CLASS_OPTION[0])) { - fileFilterClassName = getOption(FILE_FILTER_CLASS_OPTION[0]); - } - - PathFilter pathFilter = null; - // Prefix Addition is presently handled in the Mapper and unlike runsequential() - // need not be done via a pathFilter - if (!StringUtils.isBlank(fileFilterClassName) && !PrefixAdditionFilter.class.getName().equals(fileFilterClassName)) { - try { - pathFilter = (PathFilter) Class.forName(fileFilterClassName).newInstance(); - } catch (InstantiationException | IllegalAccessException e) { - throw new IllegalStateException(e); - } - } - - // Prepare Job for submission. - Job job = prepareJob(input, output, MultipleTextFileInputFormat.class, - SequenceFilesFromDirectoryMapper.class, Text.class, Text.class, - SequenceFileOutputFormat.class, "SequenceFilesFromDirectory"); - - Configuration jobConfig = job.getConfiguration(); - jobConfig.set(KEY_PREFIX_OPTION[0], keyPrefix); - jobConfig.set(FILE_FILTER_CLASS_OPTION[0], fileFilterClassName); - - FileSystem fs = FileSystem.get(jobConfig); - FileStatus fsFileStatus = fs.getFileStatus(input); - - String inputDirList; - if (pathFilter != null) { - inputDirList = HadoopUtil.buildDirList(fs, fsFileStatus, pathFilter); - } else { - inputDirList = HadoopUtil.buildDirList(fs, fsFileStatus); - } - - jobConfig.set(BASE_INPUT_PATH, input.toString()); - - long chunkSizeInBytes = chunkSizeInMB * 1024 * 1024; - - // set the max split locations, otherwise we get nasty debug stuff - jobConfig.set("mapreduce.job.max.split.locations", String.valueOf(MAX_JOB_SPLIT_LOCATIONS)); - - FileInputFormat.setInputPaths(job, inputDirList); - // need to set this to a multiple of the block size, or no split happens - FileInputFormat.setMaxInputSplitSize(job, chunkSizeInBytes); - FileOutputFormat.setCompressOutput(job, true); - - boolean succeeded = job.waitForCompletion(true); - if (!succeeded) { - return -1; - } - return 0; - } - - /** - * Override this method in order to add additional options to the command line of the SequenceFileFromDirectory job. - * Do not forget to call super() otherwise all standard options (input/output dirs etc) will not be available. - */ - protected void addOptions() { - addInputOption(); - addOutputOption(); - addOption(DefaultOptionCreator.overwriteOption().create()); - addOption(DefaultOptionCreator.methodOption().create()); - addOption(CHUNK_SIZE_OPTION[0], CHUNK_SIZE_OPTION[1], "The chunkSize in MegaBytes. Defaults to 64", "64"); - addOption(FILE_FILTER_CLASS_OPTION[0], FILE_FILTER_CLASS_OPTION[1], - "The name of the class to use for file parsing. Default: " + PREFIX_ADDITION_FILTER, PREFIX_ADDITION_FILTER); - addOption(KEY_PREFIX_OPTION[0], KEY_PREFIX_OPTION[1], "The prefix to be prepended to the key", ""); - addOption(CHARSET_OPTION[0], CHARSET_OPTION[1], - "The name of the character encoding of the input files. Default to UTF-8", "UTF-8"); - } - - /** - * Override this method in order to parse your additional options from the command line. Do not forget to call - * super() otherwise standard options (input/output dirs etc) will not be available. - * - * @return Map of options - */ - protected Map<String, String> parseOptions() { - Map<String, String> options = new HashMap<>(); - options.put(CHUNK_SIZE_OPTION[0], getOption(CHUNK_SIZE_OPTION[0])); - options.put(FILE_FILTER_CLASS_OPTION[0], getOption(FILE_FILTER_CLASS_OPTION[0])); - options.put(CHARSET_OPTION[0], getOption(CHARSET_OPTION[0])); - return options; - } -} http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryFilter.java ---------------------------------------------------------------------- diff --git a/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryFilter.java b/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryFilter.java deleted file mode 100644 index 6e4bd64..0000000 --- a/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryFilter.java +++ /dev/null @@ -1,99 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.text; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; -import org.apache.mahout.utils.io.ChunkedWriter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.nio.charset.Charset; -import java.util.Map; - -/** - * Implement this interface if you wish to extend SequenceFilesFromDirectory with your own parsing logic. - */ -public abstract class SequenceFilesFromDirectoryFilter implements PathFilter { - private static final Logger log = LoggerFactory.getLogger(SequenceFilesFromDirectoryFilter.class); - - private final String prefix; - private final ChunkedWriter writer; - private final Charset charset; - private final FileSystem fs; - private final Map<String, String> options; - private final Configuration conf; - - protected SequenceFilesFromDirectoryFilter(Configuration conf, - String keyPrefix, - Map<String, String> options, - ChunkedWriter writer, - Charset charset, - FileSystem fs) { - this.prefix = keyPrefix; - this.writer = writer; - this.charset = charset; - this.fs = fs; - this.options = options; - this.conf = conf; - } - - protected final String getPrefix() { - return prefix; - } - - protected final ChunkedWriter getWriter() { - return writer; - } - - protected final Charset getCharset() { - return charset; - } - - protected final FileSystem getFs() { - return fs; - } - - protected final Map<String, String> getOptions() { - return options; - } - - protected final Configuration getConf() { - return conf; - } - - @Override - public final boolean accept(Path current) { - log.debug("CURRENT: {}", current.getName()); - try { - for (FileStatus fst : fs.listStatus(current)) { - log.debug("CHILD: {}", fst.getPath().getName()); - process(fst, current); - } - } catch (IOException ioe) { - throw new IllegalStateException(ioe); - } - return false; - } - - protected abstract void process(FileStatus in, Path current) throws IOException; -} http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryMapper.java ---------------------------------------------------------------------- diff --git a/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryMapper.java b/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryMapper.java deleted file mode 100644 index 40df3c2..0000000 --- a/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryMapper.java +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.text; - -import java.io.IOException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; -import org.apache.mahout.common.HadoopUtil; - -import static org.apache.mahout.text.SequenceFilesFromDirectory.KEY_PREFIX_OPTION; - -/** - * Map class for SequenceFilesFromDirectory MR job - */ -public class SequenceFilesFromDirectoryMapper extends Mapper<IntWritable, BytesWritable, Text, Text> { - - private String keyPrefix; - private Text fileValue = new Text(); - - @Override - protected void setup(Context context) throws IOException, InterruptedException { - super.setup(context); - this.keyPrefix = context.getConfiguration().get(KEY_PREFIX_OPTION[0], ""); - } - - public void map(IntWritable key, BytesWritable value, Context context) - throws IOException, InterruptedException { - - Configuration configuration = context.getConfiguration(); - Path filePath = ((CombineFileSplit) context.getInputSplit()).getPath(key.get()); - String relativeFilePath = HadoopUtil.calcRelativeFilePath(configuration, filePath); - - String filename = this.keyPrefix.length() > 0 ? - this.keyPrefix + Path.SEPARATOR + relativeFilePath : - Path.SEPARATOR + relativeFilePath; - - fileValue.set(value.getBytes(), 0, value.getBytes().length); - context.write(new Text(filename), fileValue); - } -} http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java ---------------------------------------------------------------------- diff --git a/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java b/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java deleted file mode 100644 index c17cc12..0000000 --- a/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java +++ /dev/null @@ -1,369 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.mahout.text; - -import org.apache.commons.io.DirectoryWalker; -import org.apache.commons.io.comparator.CompositeFileComparator; -import org.apache.commons.io.comparator.DirectoryFileComparator; -import org.apache.commons.io.comparator.PathFileComparator; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; -import org.apache.hadoop.util.ToolRunner; -import org.apache.mahout.common.AbstractJob; -import org.apache.mahout.common.HadoopUtil; -import org.apache.mahout.common.commandline.DefaultOptionCreator; -import org.apache.mahout.utils.email.MailOptions; -import org.apache.mahout.utils.email.MailProcessor; -import org.apache.mahout.utils.io.ChunkedWriter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.nio.charset.Charset; -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Comparator; -import java.util.Deque; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.regex.Pattern; - -/** - * Converts a directory of gzipped mail archives into SequenceFiles of specified - * chunkSize. This class is similar to {@link SequenceFilesFromDirectory} except - * it uses block-compressed {@link org.apache.hadoop.io.SequenceFile}s and parses out the subject and - * body text of each mail message into a separate key/value pair. - */ -public final class SequenceFilesFromMailArchives extends AbstractJob { - - private static final Logger log = LoggerFactory.getLogger(SequenceFilesFromMailArchives.class); - - public static final String[] CHUNK_SIZE_OPTION = {"chunkSize", "chunk"}; - public static final String[] KEY_PREFIX_OPTION = {"keyPrefix", "prefix"}; - public static final String[] CHARSET_OPTION = {"charset", "c"}; - public static final String[] SUBJECT_OPTION = {"subject", "s"}; - public static final String[] TO_OPTION = {"to", "to"}; - public static final String[] FROM_OPTION = {"from", "from"}; - public static final String[] REFERENCES_OPTION = {"references", "refs"}; - public static final String[] BODY_OPTION = {"body", "b"}; - public static final String[] STRIP_QUOTED_OPTION = {"stripQuoted", "q"}; - public static final String[] QUOTED_REGEX_OPTION = {"quotedRegex", "regex"}; - public static final String[] SEPARATOR_OPTION = {"separator", "sep"}; - public static final String[] BODY_SEPARATOR_OPTION = {"bodySeparator", "bodySep"}; - public static final String BASE_INPUT_PATH = "baseinputpath"; - - private static final int MAX_JOB_SPLIT_LOCATIONS = 1000000; - - public void createSequenceFiles(MailOptions options) throws IOException { - try (ChunkedWriter writer = - new ChunkedWriter(getConf(), options.getChunkSize(), new Path(options.getOutputDir()))){ - MailProcessor processor = new MailProcessor(options, options.getPrefix(), writer); - if (options.getInput().isDirectory()) { - PrefixAdditionDirectoryWalker walker = new PrefixAdditionDirectoryWalker(processor, writer); - walker.walk(options.getInput()); - log.info("Parsed {} messages from {}", walker.getMessageCount(), options.getInput().getAbsolutePath()); - } else { - long start = System.currentTimeMillis(); - long cnt = processor.parseMboxLineByLine(options.getInput()); - long finish = System.currentTimeMillis(); - log.info("Parsed {} messages from {} in time: {}", cnt, options.getInput().getAbsolutePath(), finish - start); - } - } - } - - private static class PrefixAdditionDirectoryWalker extends DirectoryWalker<Object> { - - @SuppressWarnings("unchecked") - private static final Comparator<File> FILE_COMPARATOR = new CompositeFileComparator( - DirectoryFileComparator.DIRECTORY_REVERSE, PathFileComparator.PATH_COMPARATOR); - - private final Deque<MailProcessor> processors = new ArrayDeque<>(); - private final ChunkedWriter writer; - private final Deque<Long> messageCounts = new ArrayDeque<>(); - - public PrefixAdditionDirectoryWalker(MailProcessor processor, ChunkedWriter writer) { - processors.addFirst(processor); - this.writer = writer; - messageCounts.addFirst(0L); - } - - public void walk(File startDirectory) throws IOException { - super.walk(startDirectory, null); - } - - public long getMessageCount() { - return messageCounts.getFirst(); - } - - @Override - protected void handleDirectoryStart(File current, int depth, Collection<Object> results) throws IOException { - if (depth > 0) { - log.info("At {}", current.getAbsolutePath()); - MailProcessor processor = processors.getFirst(); - MailProcessor subDirProcessor = new MailProcessor(processor.getOptions(), processor.getPrefix() - + File.separator + current.getName(), writer); - processors.push(subDirProcessor); - messageCounts.push(0L); - } - } - - @Override - protected File[] filterDirectoryContents(File directory, int depth, File[] files) throws IOException { - Arrays.sort(files, FILE_COMPARATOR); - return files; - } - - @Override - protected void handleFile(File current, int depth, Collection<Object> results) throws IOException { - MailProcessor processor = processors.getFirst(); - long currentDirMessageCount = messageCounts.pop(); - try { - currentDirMessageCount += processor.parseMboxLineByLine(current); - } catch (IOException e) { - throw new IllegalStateException("Error processing " + current, e); - } - messageCounts.push(currentDirMessageCount); - } - - @Override - protected void handleDirectoryEnd(File current, int depth, Collection<Object> results) throws IOException { - if (depth > 0) { - final long currentDirMessageCount = messageCounts.pop(); - log.info("Parsed {} messages from directory {}", currentDirMessageCount, current.getAbsolutePath()); - - processors.pop(); - - // aggregate message counts - long parentDirMessageCount = messageCounts.pop(); - parentDirMessageCount += currentDirMessageCount; - messageCounts.push(parentDirMessageCount); - } - } - } - - public static void main(String[] args) throws Exception { - ToolRunner.run(new Configuration(), new SequenceFilesFromMailArchives(), args); - } - - @Override - public int run(String[] args) throws Exception { - addInputOption(); - addOutputOption(); - addOption(DefaultOptionCreator.methodOption().create()); - - addOption(CHUNK_SIZE_OPTION[0], CHUNK_SIZE_OPTION[1], "The chunkSize in MegaBytes. Defaults to 64", "64"); - addOption(KEY_PREFIX_OPTION[0], KEY_PREFIX_OPTION[1], "The prefix to be prepended to the key", ""); - addOption(CHARSET_OPTION[0], CHARSET_OPTION[1], - "The name of the character encoding of the input files. Default to UTF-8", "UTF-8"); - addFlag(SUBJECT_OPTION[0], SUBJECT_OPTION[1], "Include the Mail subject as part of the text. Default is false"); - addFlag(TO_OPTION[0], TO_OPTION[1], "Include the to field in the text. Default is false"); - addFlag(FROM_OPTION[0], FROM_OPTION[1], "Include the from field in the text. Default is false"); - addFlag(REFERENCES_OPTION[0], REFERENCES_OPTION[1], - "Include the references field in the text. Default is false"); - addFlag(BODY_OPTION[0], BODY_OPTION[1], "Include the body in the output. Default is false"); - addFlag(STRIP_QUOTED_OPTION[0], STRIP_QUOTED_OPTION[1], - "Strip (remove) quoted email text in the body. Default is false"); - addOption(QUOTED_REGEX_OPTION[0], QUOTED_REGEX_OPTION[1], - "Specify the regex that identifies quoted text. " - + "Default is to look for > or | at the beginning of the line."); - addOption(SEPARATOR_OPTION[0], SEPARATOR_OPTION[1], - "The separator to use between metadata items (to, from, etc.). Default is \\n", "\n"); - addOption(BODY_SEPARATOR_OPTION[0], BODY_SEPARATOR_OPTION[1], - "The separator to use between lines in the body. Default is \\n. " - + "Useful to change if you wish to have the message be on one line", "\n"); - - addOption(DefaultOptionCreator.helpOption()); - Map<String, List<String>> parsedArgs = parseArguments(args); - if (parsedArgs == null) { - return -1; - } - File input = getInputFile(); - String outputDir = getOutputPath().toString(); - - int chunkSize = 64; - if (hasOption(CHUNK_SIZE_OPTION[0])) { - chunkSize = Integer.parseInt(getOption(CHUNK_SIZE_OPTION[0])); - } - - String prefix = ""; - if (hasOption(KEY_PREFIX_OPTION[0])) { - prefix = getOption(KEY_PREFIX_OPTION[0]); - } - - Charset charset = Charset.forName(getOption(CHARSET_OPTION[0])); - MailOptions options = new MailOptions(); - options.setInput(input); - options.setOutputDir(outputDir); - options.setPrefix(prefix); - options.setChunkSize(chunkSize); - options.setCharset(charset); - - List<Pattern> patterns = new ArrayList<>(5); - // patternOrder is used downstream so that we can know what order the text - // is in instead of encoding it in the string, which - // would require more processing later to remove it pre feature selection. - Map<String, Integer> patternOrder = new HashMap<>(); - int order = 0; - if (hasOption(FROM_OPTION[0])) { - patterns.add(MailProcessor.FROM_PREFIX); - patternOrder.put(MailOptions.FROM, order++); - } - if (hasOption(TO_OPTION[0])) { - patterns.add(MailProcessor.TO_PREFIX); - patternOrder.put(MailOptions.TO, order++); - } - if (hasOption(REFERENCES_OPTION[0])) { - patterns.add(MailProcessor.REFS_PREFIX); - patternOrder.put(MailOptions.REFS, order++); - } - if (hasOption(SUBJECT_OPTION[0])) { - patterns.add(MailProcessor.SUBJECT_PREFIX); - patternOrder.put(MailOptions.SUBJECT, order += 1); - } - options.setStripQuotedText(hasOption(STRIP_QUOTED_OPTION[0])); - - options.setPatternsToMatch(patterns.toArray(new Pattern[patterns.size()])); - options.setPatternOrder(patternOrder); - options.setIncludeBody(hasOption(BODY_OPTION[0])); - - if (hasOption(SEPARATOR_OPTION[0])) { - options.setSeparator(getOption(SEPARATOR_OPTION[0])); - } else { - options.setSeparator("\n"); - } - - if (hasOption(BODY_SEPARATOR_OPTION[0])) { - options.setBodySeparator(getOption(BODY_SEPARATOR_OPTION[0])); - } - - if (hasOption(QUOTED_REGEX_OPTION[0])) { - options.setQuotedTextPattern(Pattern.compile(getOption(QUOTED_REGEX_OPTION[0]))); - } - - if (getOption(DefaultOptionCreator.METHOD_OPTION, - DefaultOptionCreator.MAPREDUCE_METHOD).equals(DefaultOptionCreator.SEQUENTIAL_METHOD)) { - runSequential(options); - } else { - runMapReduce(getInputPath(), getOutputPath()); - } - - return 0; - } - - private int runSequential(MailOptions options) - throws IOException, InterruptedException, NoSuchMethodException { - - long start = System.currentTimeMillis(); - createSequenceFiles(options); - long finish = System.currentTimeMillis(); - log.info("Conversion took {}ms", finish - start); - - return 0; - } - - private int runMapReduce(Path input, Path output) throws IOException, InterruptedException, ClassNotFoundException { - - Job job = prepareJob(input, output, MultipleTextFileInputFormat.class, SequenceFilesFromMailArchivesMapper.class, - Text.class, Text.class, SequenceFileOutputFormat.class, "SequentialFilesFromMailArchives"); - - Configuration jobConfig = job.getConfiguration(); - - if (hasOption(KEY_PREFIX_OPTION[0])) { - jobConfig.set(KEY_PREFIX_OPTION[1], getOption(KEY_PREFIX_OPTION[0])); - } - - int chunkSize = 0; - if (hasOption(CHUNK_SIZE_OPTION[0])) { - chunkSize = Integer.parseInt(getOption(CHUNK_SIZE_OPTION[0])); - jobConfig.set(CHUNK_SIZE_OPTION[0], String.valueOf(chunkSize)); - } - - Charset charset; - if (hasOption(CHARSET_OPTION[0])) { - charset = Charset.forName(getOption(CHARSET_OPTION[0])); - jobConfig.set(CHARSET_OPTION[0], charset.displayName()); - } - - if (hasOption(FROM_OPTION[0])) { - jobConfig.set(FROM_OPTION[1], "true"); - } - - if (hasOption(TO_OPTION[0])) { - jobConfig.set(TO_OPTION[1], "true"); - } - - if (hasOption(REFERENCES_OPTION[0])) { - jobConfig.set(REFERENCES_OPTION[1], "true"); - } - - if (hasOption(SUBJECT_OPTION[0])) { - jobConfig.set(SUBJECT_OPTION[1], "true"); - } - - if (hasOption(QUOTED_REGEX_OPTION[0])) { - jobConfig.set(QUOTED_REGEX_OPTION[1], Pattern.compile(getOption(QUOTED_REGEX_OPTION[0])).toString()); - } - - if (hasOption(SEPARATOR_OPTION[0])) { - jobConfig.set(SEPARATOR_OPTION[1], getOption(SEPARATOR_OPTION[0])); - } else { - jobConfig.set(SEPARATOR_OPTION[1], "\n"); - } - - if (hasOption(BODY_OPTION[0])) { - jobConfig.set(BODY_OPTION[1], "true"); - } else { - jobConfig.set(BODY_OPTION[1], "false"); - } - - if (hasOption(BODY_SEPARATOR_OPTION[0])) { - jobConfig.set(BODY_SEPARATOR_OPTION[1], getOption(BODY_SEPARATOR_OPTION[0])); - } else { - jobConfig.set(BODY_SEPARATOR_OPTION[1], "\n"); - } - - FileSystem fs = FileSystem.get(jobConfig); - FileStatus fsFileStatus = fs.getFileStatus(inputPath); - - jobConfig.set(BASE_INPUT_PATH, inputPath.toString()); - String inputDirList = HadoopUtil.buildDirList(fs, fsFileStatus); - FileInputFormat.setInputPaths(job, inputDirList); - - long chunkSizeInBytes = chunkSize * 1024 * 1024; - // need to set this to a multiple of the block size, or no split happens - FileInputFormat.setMaxInputSplitSize(job, chunkSizeInBytes); - - // set the max split locations, otherwise we get nasty debug stuff - jobConfig.set("mapreduce.job.max.split.locations", String.valueOf(MAX_JOB_SPLIT_LOCATIONS)); - - boolean succeeded = job.waitForCompletion(true); - if (!succeeded) { - return -1; - } - return 0; - } -} http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchivesMapper.java ---------------------------------------------------------------------- diff --git a/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchivesMapper.java b/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchivesMapper.java deleted file mode 100644 index 203e8fb..0000000 --- a/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchivesMapper.java +++ /dev/null @@ -1,244 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.text; - -import com.google.common.base.Joiner; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; -import org.apache.mahout.common.HadoopUtil; -import org.apache.mahout.common.iterator.FileLineIterable; -import org.apache.mahout.utils.email.MailOptions; -import org.apache.mahout.utils.email.MailProcessor; - -import java.io.ByteArrayInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStream; -import java.nio.charset.Charset; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import static org.apache.mahout.text.SequenceFilesFromMailArchives.BODY_OPTION; -import static org.apache.mahout.text.SequenceFilesFromMailArchives.BODY_SEPARATOR_OPTION; -import static org.apache.mahout.text.SequenceFilesFromMailArchives.CHARSET_OPTION; -import static org.apache.mahout.text.SequenceFilesFromMailArchives.CHUNK_SIZE_OPTION; -import static org.apache.mahout.text.SequenceFilesFromMailArchives.FROM_OPTION; -import static org.apache.mahout.text.SequenceFilesFromMailArchives.KEY_PREFIX_OPTION; -import static org.apache.mahout.text.SequenceFilesFromMailArchives.QUOTED_REGEX_OPTION; -import static org.apache.mahout.text.SequenceFilesFromMailArchives.REFERENCES_OPTION; -import static org.apache.mahout.text.SequenceFilesFromMailArchives.SEPARATOR_OPTION; -import static org.apache.mahout.text.SequenceFilesFromMailArchives.STRIP_QUOTED_OPTION; -import static org.apache.mahout.text.SequenceFilesFromMailArchives.SUBJECT_OPTION; -import static org.apache.mahout.text.SequenceFilesFromMailArchives.TO_OPTION; - -/** - * Map Class for the SequenceFilesFromMailArchives job - */ -public class SequenceFilesFromMailArchivesMapper extends Mapper<IntWritable, BytesWritable, Text, Text> { - - private Text outKey = new Text(); - private Text outValue = new Text(); - - private static final Pattern MESSAGE_START = Pattern.compile( - "^From \\S+@\\S.*\\d{4}$", Pattern.CASE_INSENSITIVE); - private static final Pattern MESSAGE_ID_PREFIX = Pattern.compile( - "^message-id: <(.*)>$", Pattern.CASE_INSENSITIVE); - - private MailOptions options; - - @Override - public void setup(Context context) throws IOException, InterruptedException { - - Configuration configuration = context.getConfiguration(); - - // absorb all of the options into the MailOptions object - this.options = new MailOptions(); - - options.setPrefix(configuration.get(KEY_PREFIX_OPTION[1], "")); - - if (!configuration.get(CHUNK_SIZE_OPTION[0], "").equals("")) { - options.setChunkSize(configuration.getInt(CHUNK_SIZE_OPTION[0], 64)); - } - - if (!configuration.get(CHARSET_OPTION[0], "").equals("")) { - Charset charset = Charset.forName(configuration.get(CHARSET_OPTION[0], "UTF-8")); - options.setCharset(charset); - } else { - Charset charset = Charset.forName("UTF-8"); - options.setCharset(charset); - } - - List<Pattern> patterns = Lists.newArrayListWithCapacity(5); - // patternOrder is used downstream so that we can know what order the - // text is in instead - // of encoding it in the string, which - // would require more processing later to remove it pre feature - // selection. - Map<String, Integer> patternOrder = Maps.newHashMap(); - int order = 0; - if (!configuration.get(FROM_OPTION[1], "").equals("")) { - patterns.add(MailProcessor.FROM_PREFIX); - patternOrder.put(MailOptions.FROM, order++); - } - - if (!configuration.get(TO_OPTION[1], "").equals("")) { - patterns.add(MailProcessor.TO_PREFIX); - patternOrder.put(MailOptions.TO, order++); - } - - if (!configuration.get(REFERENCES_OPTION[1], "").equals("")) { - patterns.add(MailProcessor.REFS_PREFIX); - patternOrder.put(MailOptions.REFS, order++); - } - - if (!configuration.get(SUBJECT_OPTION[1], "").equals("")) { - patterns.add(MailProcessor.SUBJECT_PREFIX); - patternOrder.put(MailOptions.SUBJECT, order += 1); - } - - options.setStripQuotedText(configuration.getBoolean(STRIP_QUOTED_OPTION[1], false)); - - options.setPatternsToMatch(patterns.toArray(new Pattern[patterns.size()])); - options.setPatternOrder(patternOrder); - - options.setIncludeBody(configuration.getBoolean(BODY_OPTION[1], false)); - - options.setSeparator("\n"); - if (!configuration.get(SEPARATOR_OPTION[1], "").equals("")) { - options.setSeparator(configuration.get(SEPARATOR_OPTION[1], "")); - } - if (!configuration.get(BODY_SEPARATOR_OPTION[1], "").equals("")) { - options.setBodySeparator(configuration.get(BODY_SEPARATOR_OPTION[1], "")); - } - if (!configuration.get(QUOTED_REGEX_OPTION[1], "").equals("")) { - options.setQuotedTextPattern(Pattern.compile(configuration.get(QUOTED_REGEX_OPTION[1], ""))); - } - - } - - public long parseMailboxLineByLine(String filename, InputStream mailBoxInputStream, Context context) - throws IOException, InterruptedException { - long messageCount = 0; - try { - StringBuilder contents = new StringBuilder(); - StringBuilder body = new StringBuilder(); - Matcher messageIdMatcher = MESSAGE_ID_PREFIX.matcher(""); - Matcher messageBoundaryMatcher = MESSAGE_START.matcher(""); - String[] patternResults = new String[options.getPatternsToMatch().length]; - Matcher[] matches = new Matcher[options.getPatternsToMatch().length]; - for (int i = 0; i < matches.length; i++) { - matches[i] = options.getPatternsToMatch()[i].matcher(""); - } - - String messageId = null; - boolean inBody = false; - Pattern quotedTextPattern = options.getQuotedTextPattern(); - - for (String nextLine : new FileLineIterable(mailBoxInputStream, options.getCharset(), false, filename)) { - if (!options.isStripQuotedText() || !quotedTextPattern.matcher(nextLine).find()) { - for (int i = 0; i < matches.length; i++) { - Matcher matcher = matches[i]; - matcher.reset(nextLine); - if (matcher.matches()) { - patternResults[i] = matcher.group(1); - } - } - - // only start appending body content after we've seen a message ID - if (messageId != null) { - // first, see if we hit the end of the message - messageBoundaryMatcher.reset(nextLine); - if (messageBoundaryMatcher.matches()) { - // done parsing this message ... write it out - String key = generateKey(filename, options.getPrefix(), messageId); - // if this ordering changes, then also change - // FromEmailToDictionaryMapper - writeContent(options.getSeparator(), contents, body, patternResults); - - this.outKey.set(key); - this.outValue.set(contents.toString()); - context.write(this.outKey, this.outValue); - contents.setLength(0); // reset the buffer - body.setLength(0); - messageId = null; - inBody = false; - } else { - if (inBody && options.isIncludeBody()) { - if (!nextLine.isEmpty()) { - body.append(nextLine).append(options.getBodySeparator()); - } - } else { - // first empty line we see after reading the message Id - // indicates that we are in the body ... - inBody = nextLine.isEmpty(); - } - } - } else { - if (nextLine.length() > 14) { - messageIdMatcher.reset(nextLine); - if (messageIdMatcher.matches()) { - messageId = messageIdMatcher.group(1); - ++messageCount; - } - } - } - } - } - // write the last message in the file if available - if (messageId != null) { - String key = generateKey(filename, options.getPrefix(), messageId); - writeContent(options.getSeparator(), contents, body, patternResults); - this.outKey.set(key); - this.outValue.set(contents.toString()); - context.write(this.outKey, this.outValue); - contents.setLength(0); // reset the buffer - } - } catch (FileNotFoundException ignored) { - - } - return messageCount; - } - - protected static String generateKey(String mboxFilename, String prefix, String messageId) { - return Joiner.on(Path.SEPARATOR).join(Lists.newArrayList(prefix, mboxFilename, messageId).iterator()); - } - - private static void writeContent(String separator, StringBuilder contents, CharSequence body, String[] matches) { - String matchesString = Joiner.on(separator).useForNull("").join(Arrays.asList(matches).iterator()); - contents.append(matchesString).append(separator).append(body); - } - - public void map(IntWritable key, BytesWritable value, Context context) - throws IOException, InterruptedException { - Configuration configuration = context.getConfiguration(); - Path filePath = ((CombineFileSplit) context.getInputSplit()).getPath(key.get()); - String relativeFilePath = HadoopUtil.calcRelativeFilePath(configuration, filePath); - ByteArrayInputStream is = new ByteArrayInputStream(value.getBytes()); - parseMailboxLineByLine(relativeFilePath, is, context); - } -} http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/integration/src/main/java/org/apache/mahout/text/TextParagraphSplittingJob.java ---------------------------------------------------------------------- diff --git a/integration/src/main/java/org/apache/mahout/text/TextParagraphSplittingJob.java b/integration/src/main/java/org/apache/mahout/text/TextParagraphSplittingJob.java deleted file mode 100644 index cacfd22..0000000 --- a/integration/src/main/java/org/apache/mahout/text/TextParagraphSplittingJob.java +++ /dev/null @@ -1,73 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.text; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; -import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; -import org.apache.hadoop.util.ToolRunner; -import org.apache.mahout.common.AbstractJob; - -import java.io.IOException; - -public class TextParagraphSplittingJob extends AbstractJob { - - @Override - public int run(String[] strings) throws Exception { - Configuration originalConf = getConf(); - Job job = prepareJob(new Path(originalConf.get("mapred.input.dir")), - new Path(originalConf.get("mapred.output.dir")), - SequenceFileInputFormat.class, - SplitMap.class, - Text.class, - Text.class, - Reducer.class, - Text.class, - Text.class, - SequenceFileOutputFormat.class); - job.setNumReduceTasks(0); - boolean succeeded = job.waitForCompletion(true); - return succeeded ? 0 : -1; - } - - public static class SplitMap extends Mapper<Text,Text,Text,Text> { - - @Override - protected void map(Text key, Text text, Context context) throws IOException, InterruptedException { - Text outText = new Text(); - int loc = 0; - while (loc >= 0 && loc < text.getLength()) { - int nextLoc = text.find("\n\n", loc + 1); - if (nextLoc > 0) { - outText.set(text.getBytes(), loc, nextLoc - loc); - context.write(key, outText); - } - loc = nextLoc; - } - } - } - - public static void main(String[] args) throws Exception { - ToolRunner.run(new TextParagraphSplittingJob(), args); - } -} http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/integration/src/main/java/org/apache/mahout/text/WholeFileRecordReader.java ---------------------------------------------------------------------- diff --git a/integration/src/main/java/org/apache/mahout/text/WholeFileRecordReader.java b/integration/src/main/java/org/apache/mahout/text/WholeFileRecordReader.java deleted file mode 100644 index b8441b7..0000000 --- a/integration/src/main/java/org/apache/mahout/text/WholeFileRecordReader.java +++ /dev/null @@ -1,125 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.text; - -import java.io.IOException; - -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; -import org.apache.hadoop.mapreduce.lib.input.FileSplit; - -import static org.apache.mahout.text.SequenceFilesFromDirectory.FILE_FILTER_CLASS_OPTION; - -/** - * RecordReader used with the MultipleTextFileInputFormat class to read full files as - * k/v pairs and groups of files as single input splits. - */ -public class WholeFileRecordReader extends RecordReader<IntWritable, BytesWritable> { - - private FileSplit fileSplit; - private boolean processed = false; - private Configuration configuration; - private BytesWritable value = new BytesWritable(); - private IntWritable index; - private String fileFilterClassName = null; - private PathFilter pathFilter = null; - - public WholeFileRecordReader(CombineFileSplit fileSplit, TaskAttemptContext taskAttemptContext, Integer idx) - throws IOException { - this.fileSplit = new FileSplit(fileSplit.getPath(idx), fileSplit.getOffset(idx), - fileSplit.getLength(idx), fileSplit.getLocations()); - this.configuration = taskAttemptContext.getConfiguration(); - this.index = new IntWritable(idx); - this.fileFilterClassName = this.configuration.get(FILE_FILTER_CLASS_OPTION[0]); - } - - @Override - public IntWritable getCurrentKey() { - return index; - } - - @Override - public BytesWritable getCurrentValue() { - return value; - } - - @Override - public float getProgress() throws IOException { - return processed ? 1.0f : 0.0f; - } - - @Override - public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) - throws IOException, InterruptedException { - if (!StringUtils.isBlank(fileFilterClassName) && - !PrefixAdditionFilter.class.getName().equals(fileFilterClassName)) { - try { - pathFilter = (PathFilter) Class.forName(fileFilterClassName).newInstance(); - } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) { - throw new IllegalStateException(e); - } - } - } - - @Override - public boolean nextKeyValue() throws IOException { - if (!processed) { - byte[] contents = new byte[(int) fileSplit.getLength()]; - Path file = fileSplit.getPath(); - FileSystem fs = file.getFileSystem(this.configuration); - - if (!fs.isFile(file)) { - return false; - } - - FileStatus[] fileStatuses; - if (pathFilter != null) { - fileStatuses = fs.listStatus(file, pathFilter); - } else { - fileStatuses = fs.listStatus(file); - } - - if (fileStatuses.length == 1) { - try (FSDataInputStream in = fs.open(fileStatuses[0].getPath())) { - IOUtils.readFully(in, contents, 0, contents.length); - value.setCapacity(contents.length); - value.set(contents, 0, contents.length); - } - processed = true; - return true; - } - } - return false; - } - - @Override - public void close() throws IOException { - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/integration/src/main/java/org/apache/mahout/text/WikipediaToSequenceFile.java ---------------------------------------------------------------------- diff --git a/integration/src/main/java/org/apache/mahout/text/WikipediaToSequenceFile.java b/integration/src/main/java/org/apache/mahout/text/WikipediaToSequenceFile.java deleted file mode 100644 index bed4640..0000000 --- a/integration/src/main/java/org/apache/mahout/text/WikipediaToSequenceFile.java +++ /dev/null @@ -1,210 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.text; - -import java.io.File; -import java.io.IOException; -import java.util.HashSet; -import java.util.Locale; -import java.util.Set; - -import org.apache.commons.cli2.CommandLine; -import org.apache.commons.cli2.Group; -import org.apache.commons.cli2.Option; -import org.apache.commons.cli2.OptionException; -import org.apache.commons.cli2.builder.ArgumentBuilder; -import org.apache.commons.cli2.builder.DefaultOptionBuilder; -import org.apache.commons.cli2.builder.GroupBuilder; -import org.apache.commons.cli2.commandline.Parser; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.DefaultStringifier; -import org.apache.hadoop.io.Stringifier; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; -import org.apache.hadoop.util.GenericsUtil; -import org.apache.mahout.common.CommandLineUtil; -import org.apache.mahout.common.HadoopUtil; -import org.apache.mahout.common.commandline.DefaultOptionCreator; -import org.apache.mahout.common.iterator.FileLineIterable; -import org.apache.mahout.text.wikipedia.WikipediaMapper; -import org.apache.mahout.text.wikipedia.XmlInputFormat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Create and run the Wikipedia Dataset Creator. - */ -public final class WikipediaToSequenceFile { - - private static final Logger log = LoggerFactory.getLogger(WikipediaToSequenceFile.class); - - private WikipediaToSequenceFile() { } - - /** - * Takes in two arguments: - * <ol> - * <li>The input {@link org.apache.hadoop.fs.Path} where the input documents live</li> - * <li>The output {@link org.apache.hadoop.fs.Path} where to write the classifier as a - * {@link org.apache.hadoop.io.SequenceFile}</li> - * </ol> - */ - public static void main(String[] args) throws IOException { - DefaultOptionBuilder obuilder = new DefaultOptionBuilder(); - ArgumentBuilder abuilder = new ArgumentBuilder(); - GroupBuilder gbuilder = new GroupBuilder(); - - Option dirInputPathOpt = DefaultOptionCreator.inputOption().create(); - - Option dirOutputPathOpt = DefaultOptionCreator.outputOption().create(); - - Option categoriesOpt = obuilder.withLongName("categories").withArgument( - abuilder.withName("categories").withMinimum(1).withMaximum(1).create()).withDescription( - "Location of the categories file. One entry per line. " - + "Will be used to make a string match in Wikipedia Category field").withShortName("c").create(); - - Option exactMatchOpt = obuilder.withLongName("exactMatch").withDescription( - "If set, then the category name must exactly match the " - + "entry in the categories file. Default is false").withShortName("e").create(); - - Option allOpt = obuilder.withLongName("all") - .withDescription("If set, Select all files. Default is false").withShortName("all").create(); - - Option removeLabelOpt = obuilder.withLongName("removeLabels") - .withDescription("If set, remove [[Category:labels]] from document text after extracting label." - + "Default is false").withShortName("rl").create(); - - Option helpOpt = DefaultOptionCreator.helpOption(); - - Group group = gbuilder.withName("Options").withOption(categoriesOpt).withOption(dirInputPathOpt) - .withOption(dirOutputPathOpt).withOption(exactMatchOpt).withOption(allOpt).withOption(helpOpt) - .withOption(removeLabelOpt).create(); - - Parser parser = new Parser(); - parser.setGroup(group); - parser.setHelpOption(helpOpt); - try { - CommandLine cmdLine = parser.parse(args); - if (cmdLine.hasOption(helpOpt)) { - CommandLineUtil.printHelp(group); - return; - } - - String inputPath = (String) cmdLine.getValue(dirInputPathOpt); - String outputPath = (String) cmdLine.getValue(dirOutputPathOpt); - - String catFile = ""; - if (cmdLine.hasOption(categoriesOpt)) { - catFile = (String) cmdLine.getValue(categoriesOpt); - } - - boolean all = false; - if (cmdLine.hasOption(allOpt)) { - all = true; - } - - boolean removeLabels = false; - if (cmdLine.hasOption(removeLabelOpt)) { - removeLabels = true; - } - - runJob(inputPath, outputPath, catFile, cmdLine.hasOption(exactMatchOpt), all, removeLabels); - } catch (OptionException | InterruptedException | ClassNotFoundException e) { - log.error("Exception", e); - CommandLineUtil.printHelp(group); - } - } - - /** - * Run the job - * - * @param input - * the input pathname String - * @param output - * the output pathname String - * @param catFile - * the file containing the Wikipedia categories - * @param exactMatchOnly - * if true, then the Wikipedia category must match exactly instead of simply containing the - * category string - * @param all - * if true select all categories - * @param removeLabels - * if true remove Category labels from document text after extracting. - * - */ - public static void runJob(String input, - String output, - String catFile, - boolean exactMatchOnly, - boolean all, - boolean removeLabels) throws IOException, InterruptedException, ClassNotFoundException { - Configuration conf = new Configuration(); - conf.set("xmlinput.start", "<page>"); - conf.set("xmlinput.end", "</page>"); - conf.setBoolean("exact.match.only", exactMatchOnly); - conf.setBoolean("all.files", all); - conf.setBoolean("remove.labels", removeLabels); - conf.set("io.serializations", - "org.apache.hadoop.io.serializer.JavaSerialization," - + "org.apache.hadoop.io.serializer.WritableSerialization"); - - Set<String> categories = new HashSet<>(); - if (!catFile.isEmpty()) { - for (String line : new FileLineIterable(new File(catFile))) { - categories.add(line.trim().toLowerCase(Locale.ENGLISH)); - } - } - - Stringifier<Set<String>> setStringifier = - new DefaultStringifier<>(conf, GenericsUtil.getClass(categories)); - - String categoriesStr = setStringifier.toString(categories); - conf.set("wikipedia.categories", categoriesStr); - - Job job = new Job(conf); - log.info("Input: {} Out: {} Categories: {} All Files: {}", input, output, catFile, all); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(Text.class); - FileInputFormat.setInputPaths(job, new Path(input)); - Path outPath = new Path(output); - FileOutputFormat.setOutputPath(job, outPath); - job.setMapperClass(WikipediaMapper.class); - job.setInputFormatClass(XmlInputFormat.class); - job.setReducerClass(Reducer.class); - job.setOutputFormatClass(SequenceFileOutputFormat.class); - job.setJarByClass(WikipediaToSequenceFile.class); - - /* - * conf.set("mapred.compress.map.output", "true"); conf.set("mapred.map.output.compression.type", - * "BLOCK"); conf.set("mapred.output.compress", "true"); conf.set("mapred.output.compression.type", - * "BLOCK"); conf.set("mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec"); - */ - HadoopUtil.delete(conf, outPath); - - boolean succeeded = job.waitForCompletion(true); - if (!succeeded) { - throw new IllegalStateException("Job failed!"); - } - - } -} http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaAnalyzer.java ---------------------------------------------------------------------- diff --git a/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaAnalyzer.java b/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaAnalyzer.java deleted file mode 100644 index d50323d..0000000 --- a/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaAnalyzer.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.text.wikipedia; - -import org.apache.lucene.analysis.TokenStream; -import org.apache.lucene.analysis.Tokenizer; -import org.apache.lucene.analysis.core.LowerCaseFilter; -import org.apache.lucene.analysis.core.StopAnalyzer; -import org.apache.lucene.analysis.core.StopFilter; -import org.apache.lucene.analysis.standard.StandardFilter; -import org.apache.lucene.analysis.util.CharArraySet; -import org.apache.lucene.analysis.util.StopwordAnalyzerBase; -import org.apache.lucene.analysis.wikipedia.WikipediaTokenizer; - - -public class WikipediaAnalyzer extends StopwordAnalyzerBase { - - public WikipediaAnalyzer() { - super(StopAnalyzer.ENGLISH_STOP_WORDS_SET); - } - - public WikipediaAnalyzer(CharArraySet stopSet) { - super(stopSet); - } - - @Override - protected TokenStreamComponents createComponents(String fieldName) { - Tokenizer tokenizer = new WikipediaTokenizer(); - TokenStream result = new StandardFilter(tokenizer); - result = new LowerCaseFilter(result); - result = new StopFilter(result, getStopwordSet()); - return new TokenStreamComponents(tokenizer, result); - } -} http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorDriver.java ---------------------------------------------------------------------- diff --git a/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorDriver.java b/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorDriver.java deleted file mode 100644 index 8214407..0000000 --- a/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorDriver.java +++ /dev/null @@ -1,190 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.text.wikipedia; - -import java.io.File; -import java.io.IOException; -import java.util.HashSet; -import java.util.Locale; -import java.util.Set; - -import org.apache.commons.cli2.CommandLine; -import org.apache.commons.cli2.Group; -import org.apache.commons.cli2.Option; -import org.apache.commons.cli2.OptionException; -import org.apache.commons.cli2.builder.ArgumentBuilder; -import org.apache.commons.cli2.builder.DefaultOptionBuilder; -import org.apache.commons.cli2.builder.GroupBuilder; -import org.apache.commons.cli2.commandline.Parser; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.DefaultStringifier; -import org.apache.hadoop.io.Stringifier; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; -import org.apache.hadoop.util.GenericsUtil; -import org.apache.lucene.analysis.Analyzer; -import org.apache.mahout.common.ClassUtils; -import org.apache.mahout.common.CommandLineUtil; -import org.apache.mahout.common.HadoopUtil; -import org.apache.mahout.common.commandline.DefaultOptionCreator; -import org.apache.mahout.common.iterator.FileLineIterable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Create and run the Wikipedia Dataset Creator. - */ -public final class WikipediaDatasetCreatorDriver { - private static final Logger log = LoggerFactory.getLogger(WikipediaDatasetCreatorDriver.class); - - private WikipediaDatasetCreatorDriver() { } - - /** - * Takes in two arguments: - * <ol> - * <li>The input {@link org.apache.hadoop.fs.Path} where the input documents live</li> - * <li>The output {@link org.apache.hadoop.fs.Path} where to write the classifier as a - * {@link org.apache.hadoop.io.SequenceFile}</li> - * </ol> - */ - public static void main(String[] args) throws IOException, InterruptedException { - DefaultOptionBuilder obuilder = new DefaultOptionBuilder(); - ArgumentBuilder abuilder = new ArgumentBuilder(); - GroupBuilder gbuilder = new GroupBuilder(); - - Option dirInputPathOpt = DefaultOptionCreator.inputOption().create(); - - Option dirOutputPathOpt = DefaultOptionCreator.outputOption().create(); - - Option categoriesOpt = obuilder.withLongName("categories").withRequired(true).withArgument( - abuilder.withName("categories").withMinimum(1).withMaximum(1).create()).withDescription( - "Location of the categories file. One entry per line. " - + "Will be used to make a string match in Wikipedia Category field").withShortName("c").create(); - - Option exactMatchOpt = obuilder.withLongName("exactMatch").withDescription( - "If set, then the category name must exactly match the " - + "entry in the categories file. Default is false").withShortName("e").create(); - Option analyzerOpt = obuilder.withLongName("analyzer").withRequired(false).withArgument( - abuilder.withName("analyzer").withMinimum(1).withMaximum(1).create()).withDescription( - "The analyzer to use, must have a no argument constructor").withShortName("a").create(); - Option helpOpt = DefaultOptionCreator.helpOption(); - - Group group = gbuilder.withName("Options").withOption(categoriesOpt).withOption(dirInputPathOpt) - .withOption(dirOutputPathOpt).withOption(exactMatchOpt).withOption(analyzerOpt).withOption(helpOpt) - .create(); - - Parser parser = new Parser(); - parser.setGroup(group); - try { - CommandLine cmdLine = parser.parse(args); - if (cmdLine.hasOption(helpOpt)) { - CommandLineUtil.printHelp(group); - return; - } - - String inputPath = (String) cmdLine.getValue(dirInputPathOpt); - String outputPath = (String) cmdLine.getValue(dirOutputPathOpt); - String catFile = (String) cmdLine.getValue(categoriesOpt); - Class<? extends Analyzer> analyzerClass = WikipediaAnalyzer.class; - if (cmdLine.hasOption(analyzerOpt)) { - String className = cmdLine.getValue(analyzerOpt).toString(); - analyzerClass = Class.forName(className).asSubclass(Analyzer.class); - // try instantiating it, b/c there isn't any point in setting it if - // you can't instantiate it - ClassUtils.instantiateAs(analyzerClass, Analyzer.class); - } - runJob(inputPath, outputPath, catFile, cmdLine.hasOption(exactMatchOpt), - analyzerClass); - } catch (OptionException e) { - log.error("Exception", e); - CommandLineUtil.printHelp(group); - } catch (ClassNotFoundException e) { - log.error("Exception", e); - CommandLineUtil.printHelp(group); - } - } - - /** - * Run the job - * - * @param input - * the input pathname String - * @param output - * the output pathname String - * @param catFile - * the file containing the Wikipedia categories - * @param exactMatchOnly - * if true, then the Wikipedia category must match exactly instead of simply containing the - * category string - */ - public static void runJob(String input, - String output, - String catFile, - boolean exactMatchOnly, - Class<? extends Analyzer> analyzerClass) - throws IOException, InterruptedException, ClassNotFoundException { - Configuration conf = new Configuration(); - conf.set("key.value.separator.in.input.line", " "); - conf.set("xmlinput.start", "<page>"); - conf.set("xmlinput.end", "</page>"); - conf.setBoolean("exact.match.only", exactMatchOnly); - conf.set("analyzer.class", analyzerClass.getName()); - conf.set("io.serializations", - "org.apache.hadoop.io.serializer.JavaSerialization," - + "org.apache.hadoop.io.serializer.WritableSerialization"); - // Dont ever forget this. People should keep track of how hadoop conf - // parameters can make or break a piece of code - - Set<String> categories = new HashSet<>(); - for (String line : new FileLineIterable(new File(catFile))) { - categories.add(line.trim().toLowerCase(Locale.ENGLISH)); - } - - Stringifier<Set<String>> setStringifier = - new DefaultStringifier<>(conf, GenericsUtil.getClass(categories)); - - String categoriesStr = setStringifier.toString(categories); - - conf.set("wikipedia.categories", categoriesStr); - - Job job = new Job(conf); - log.info("Input: {} Out: {} Categories: {}", input, output, catFile); - job.setJarByClass(WikipediaDatasetCreatorDriver.class); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(Text.class); - job.setMapperClass(WikipediaDatasetCreatorMapper.class); - //TODO: job.setNumMapTasks(100); - job.setInputFormatClass(XmlInputFormat.class); - job.setReducerClass(WikipediaDatasetCreatorReducer.class); - job.setOutputFormatClass(TextOutputFormat.class); - - FileInputFormat.setInputPaths(job, new Path(input)); - Path outPath = new Path(output); - FileOutputFormat.setOutputPath(job, outPath); - HadoopUtil.delete(conf, outPath); - - boolean succeeded = job.waitForCompletion(true); - if (!succeeded) { - throw new IllegalStateException("Job failed!"); - } - } -}
