http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchivesMapper.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchivesMapper.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchivesMapper.java new file mode 100644 index 0000000..203e8fb --- /dev/null +++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchivesMapper.java @@ -0,0 +1,244 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.text; + +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; +import org.apache.mahout.common.HadoopUtil; +import org.apache.mahout.common.iterator.FileLineIterable; +import org.apache.mahout.utils.email.MailOptions; +import org.apache.mahout.utils.email.MailProcessor; + +import java.io.ByteArrayInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.apache.mahout.text.SequenceFilesFromMailArchives.BODY_OPTION; +import static org.apache.mahout.text.SequenceFilesFromMailArchives.BODY_SEPARATOR_OPTION; +import static org.apache.mahout.text.SequenceFilesFromMailArchives.CHARSET_OPTION; +import static org.apache.mahout.text.SequenceFilesFromMailArchives.CHUNK_SIZE_OPTION; +import static org.apache.mahout.text.SequenceFilesFromMailArchives.FROM_OPTION; +import static org.apache.mahout.text.SequenceFilesFromMailArchives.KEY_PREFIX_OPTION; +import static org.apache.mahout.text.SequenceFilesFromMailArchives.QUOTED_REGEX_OPTION; +import static org.apache.mahout.text.SequenceFilesFromMailArchives.REFERENCES_OPTION; +import static org.apache.mahout.text.SequenceFilesFromMailArchives.SEPARATOR_OPTION; +import static org.apache.mahout.text.SequenceFilesFromMailArchives.STRIP_QUOTED_OPTION; +import static org.apache.mahout.text.SequenceFilesFromMailArchives.SUBJECT_OPTION; +import static org.apache.mahout.text.SequenceFilesFromMailArchives.TO_OPTION; + +/** + * Map Class for the SequenceFilesFromMailArchives job + */ +public class SequenceFilesFromMailArchivesMapper extends Mapper<IntWritable, BytesWritable, Text, Text> { + + private Text outKey = new Text(); + private Text outValue = new Text(); + + private static final Pattern MESSAGE_START = Pattern.compile( + "^From \\S+@\\S.*\\d{4}$", Pattern.CASE_INSENSITIVE); + private static final Pattern MESSAGE_ID_PREFIX = Pattern.compile( + "^message-id: <(.*)>$", Pattern.CASE_INSENSITIVE); + + private MailOptions options; + + @Override + public void setup(Context context) throws IOException, InterruptedException { + + Configuration configuration = context.getConfiguration(); + + // absorb all of the options into the MailOptions object + this.options = new MailOptions(); + + options.setPrefix(configuration.get(KEY_PREFIX_OPTION[1], "")); + + if (!configuration.get(CHUNK_SIZE_OPTION[0], "").equals("")) { + options.setChunkSize(configuration.getInt(CHUNK_SIZE_OPTION[0], 64)); + } + + if (!configuration.get(CHARSET_OPTION[0], "").equals("")) { + Charset charset = Charset.forName(configuration.get(CHARSET_OPTION[0], "UTF-8")); + options.setCharset(charset); + } else { + Charset charset = Charset.forName("UTF-8"); + options.setCharset(charset); + } + + List<Pattern> patterns = Lists.newArrayListWithCapacity(5); + // patternOrder is used downstream so that we can know what order the + // text is in instead + // of encoding it in the string, which + // would require more processing later to remove it pre feature + // selection. + Map<String, Integer> patternOrder = Maps.newHashMap(); + int order = 0; + if (!configuration.get(FROM_OPTION[1], "").equals("")) { + patterns.add(MailProcessor.FROM_PREFIX); + patternOrder.put(MailOptions.FROM, order++); + } + + if (!configuration.get(TO_OPTION[1], "").equals("")) { + patterns.add(MailProcessor.TO_PREFIX); + patternOrder.put(MailOptions.TO, order++); + } + + if (!configuration.get(REFERENCES_OPTION[1], "").equals("")) { + patterns.add(MailProcessor.REFS_PREFIX); + patternOrder.put(MailOptions.REFS, order++); + } + + if (!configuration.get(SUBJECT_OPTION[1], "").equals("")) { + patterns.add(MailProcessor.SUBJECT_PREFIX); + patternOrder.put(MailOptions.SUBJECT, order += 1); + } + + options.setStripQuotedText(configuration.getBoolean(STRIP_QUOTED_OPTION[1], false)); + + options.setPatternsToMatch(patterns.toArray(new Pattern[patterns.size()])); + options.setPatternOrder(patternOrder); + + options.setIncludeBody(configuration.getBoolean(BODY_OPTION[1], false)); + + options.setSeparator("\n"); + if (!configuration.get(SEPARATOR_OPTION[1], "").equals("")) { + options.setSeparator(configuration.get(SEPARATOR_OPTION[1], "")); + } + if (!configuration.get(BODY_SEPARATOR_OPTION[1], "").equals("")) { + options.setBodySeparator(configuration.get(BODY_SEPARATOR_OPTION[1], "")); + } + if (!configuration.get(QUOTED_REGEX_OPTION[1], "").equals("")) { + options.setQuotedTextPattern(Pattern.compile(configuration.get(QUOTED_REGEX_OPTION[1], ""))); + } + + } + + public long parseMailboxLineByLine(String filename, InputStream mailBoxInputStream, Context context) + throws IOException, InterruptedException { + long messageCount = 0; + try { + StringBuilder contents = new StringBuilder(); + StringBuilder body = new StringBuilder(); + Matcher messageIdMatcher = MESSAGE_ID_PREFIX.matcher(""); + Matcher messageBoundaryMatcher = MESSAGE_START.matcher(""); + String[] patternResults = new String[options.getPatternsToMatch().length]; + Matcher[] matches = new Matcher[options.getPatternsToMatch().length]; + for (int i = 0; i < matches.length; i++) { + matches[i] = options.getPatternsToMatch()[i].matcher(""); + } + + String messageId = null; + boolean inBody = false; + Pattern quotedTextPattern = options.getQuotedTextPattern(); + + for (String nextLine : new FileLineIterable(mailBoxInputStream, options.getCharset(), false, filename)) { + if (!options.isStripQuotedText() || !quotedTextPattern.matcher(nextLine).find()) { + for (int i = 0; i < matches.length; i++) { + Matcher matcher = matches[i]; + matcher.reset(nextLine); + if (matcher.matches()) { + patternResults[i] = matcher.group(1); + } + } + + // only start appending body content after we've seen a message ID + if (messageId != null) { + // first, see if we hit the end of the message + messageBoundaryMatcher.reset(nextLine); + if (messageBoundaryMatcher.matches()) { + // done parsing this message ... write it out + String key = generateKey(filename, options.getPrefix(), messageId); + // if this ordering changes, then also change + // FromEmailToDictionaryMapper + writeContent(options.getSeparator(), contents, body, patternResults); + + this.outKey.set(key); + this.outValue.set(contents.toString()); + context.write(this.outKey, this.outValue); + contents.setLength(0); // reset the buffer + body.setLength(0); + messageId = null; + inBody = false; + } else { + if (inBody && options.isIncludeBody()) { + if (!nextLine.isEmpty()) { + body.append(nextLine).append(options.getBodySeparator()); + } + } else { + // first empty line we see after reading the message Id + // indicates that we are in the body ... + inBody = nextLine.isEmpty(); + } + } + } else { + if (nextLine.length() > 14) { + messageIdMatcher.reset(nextLine); + if (messageIdMatcher.matches()) { + messageId = messageIdMatcher.group(1); + ++messageCount; + } + } + } + } + } + // write the last message in the file if available + if (messageId != null) { + String key = generateKey(filename, options.getPrefix(), messageId); + writeContent(options.getSeparator(), contents, body, patternResults); + this.outKey.set(key); + this.outValue.set(contents.toString()); + context.write(this.outKey, this.outValue); + contents.setLength(0); // reset the buffer + } + } catch (FileNotFoundException ignored) { + + } + return messageCount; + } + + protected static String generateKey(String mboxFilename, String prefix, String messageId) { + return Joiner.on(Path.SEPARATOR).join(Lists.newArrayList(prefix, mboxFilename, messageId).iterator()); + } + + private static void writeContent(String separator, StringBuilder contents, CharSequence body, String[] matches) { + String matchesString = Joiner.on(separator).useForNull("").join(Arrays.asList(matches).iterator()); + contents.append(matchesString).append(separator).append(body); + } + + public void map(IntWritable key, BytesWritable value, Context context) + throws IOException, InterruptedException { + Configuration configuration = context.getConfiguration(); + Path filePath = ((CombineFileSplit) context.getInputSplit()).getPath(key.get()); + String relativeFilePath = HadoopUtil.calcRelativeFilePath(configuration, filePath); + ByteArrayInputStream is = new ByteArrayInputStream(value.getBytes()); + parseMailboxLineByLine(relativeFilePath, is, context); + } +}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/TextParagraphSplittingJob.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/TextParagraphSplittingJob.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/TextParagraphSplittingJob.java new file mode 100644 index 0000000..cacfd22 --- /dev/null +++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/TextParagraphSplittingJob.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.text; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.hadoop.util.ToolRunner; +import org.apache.mahout.common.AbstractJob; + +import java.io.IOException; + +public class TextParagraphSplittingJob extends AbstractJob { + + @Override + public int run(String[] strings) throws Exception { + Configuration originalConf = getConf(); + Job job = prepareJob(new Path(originalConf.get("mapred.input.dir")), + new Path(originalConf.get("mapred.output.dir")), + SequenceFileInputFormat.class, + SplitMap.class, + Text.class, + Text.class, + Reducer.class, + Text.class, + Text.class, + SequenceFileOutputFormat.class); + job.setNumReduceTasks(0); + boolean succeeded = job.waitForCompletion(true); + return succeeded ? 0 : -1; + } + + public static class SplitMap extends Mapper<Text,Text,Text,Text> { + + @Override + protected void map(Text key, Text text, Context context) throws IOException, InterruptedException { + Text outText = new Text(); + int loc = 0; + while (loc >= 0 && loc < text.getLength()) { + int nextLoc = text.find("\n\n", loc + 1); + if (nextLoc > 0) { + outText.set(text.getBytes(), loc, nextLoc - loc); + context.write(key, outText); + } + loc = nextLoc; + } + } + } + + public static void main(String[] args) throws Exception { + ToolRunner.run(new TextParagraphSplittingJob(), args); + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/WholeFileRecordReader.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/WholeFileRecordReader.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/WholeFileRecordReader.java new file mode 100644 index 0000000..b8441b7 --- /dev/null +++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/WholeFileRecordReader.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.text; + +import java.io.IOException; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; + +import static org.apache.mahout.text.SequenceFilesFromDirectory.FILE_FILTER_CLASS_OPTION; + +/** + * RecordReader used with the MultipleTextFileInputFormat class to read full files as + * k/v pairs and groups of files as single input splits. + */ +public class WholeFileRecordReader extends RecordReader<IntWritable, BytesWritable> { + + private FileSplit fileSplit; + private boolean processed = false; + private Configuration configuration; + private BytesWritable value = new BytesWritable(); + private IntWritable index; + private String fileFilterClassName = null; + private PathFilter pathFilter = null; + + public WholeFileRecordReader(CombineFileSplit fileSplit, TaskAttemptContext taskAttemptContext, Integer idx) + throws IOException { + this.fileSplit = new FileSplit(fileSplit.getPath(idx), fileSplit.getOffset(idx), + fileSplit.getLength(idx), fileSplit.getLocations()); + this.configuration = taskAttemptContext.getConfiguration(); + this.index = new IntWritable(idx); + this.fileFilterClassName = this.configuration.get(FILE_FILTER_CLASS_OPTION[0]); + } + + @Override + public IntWritable getCurrentKey() { + return index; + } + + @Override + public BytesWritable getCurrentValue() { + return value; + } + + @Override + public float getProgress() throws IOException { + return processed ? 1.0f : 0.0f; + } + + @Override + public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) + throws IOException, InterruptedException { + if (!StringUtils.isBlank(fileFilterClassName) && + !PrefixAdditionFilter.class.getName().equals(fileFilterClassName)) { + try { + pathFilter = (PathFilter) Class.forName(fileFilterClassName).newInstance(); + } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) { + throw new IllegalStateException(e); + } + } + } + + @Override + public boolean nextKeyValue() throws IOException { + if (!processed) { + byte[] contents = new byte[(int) fileSplit.getLength()]; + Path file = fileSplit.getPath(); + FileSystem fs = file.getFileSystem(this.configuration); + + if (!fs.isFile(file)) { + return false; + } + + FileStatus[] fileStatuses; + if (pathFilter != null) { + fileStatuses = fs.listStatus(file, pathFilter); + } else { + fileStatuses = fs.listStatus(file); + } + + if (fileStatuses.length == 1) { + try (FSDataInputStream in = fs.open(fileStatuses[0].getPath())) { + IOUtils.readFully(in, contents, 0, contents.length); + value.setCapacity(contents.length); + value.set(contents, 0, contents.length); + } + processed = true; + return true; + } + } + return false; + } + + @Override + public void close() throws IOException { + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/WikipediaToSequenceFile.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/WikipediaToSequenceFile.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/WikipediaToSequenceFile.java new file mode 100644 index 0000000..bed4640 --- /dev/null +++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/WikipediaToSequenceFile.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.text; + +import java.io.File; +import java.io.IOException; +import java.util.HashSet; +import java.util.Locale; +import java.util.Set; + +import org.apache.commons.cli2.CommandLine; +import org.apache.commons.cli2.Group; +import org.apache.commons.cli2.Option; +import org.apache.commons.cli2.OptionException; +import org.apache.commons.cli2.builder.ArgumentBuilder; +import org.apache.commons.cli2.builder.DefaultOptionBuilder; +import org.apache.commons.cli2.builder.GroupBuilder; +import org.apache.commons.cli2.commandline.Parser; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DefaultStringifier; +import org.apache.hadoop.io.Stringifier; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.hadoop.util.GenericsUtil; +import org.apache.mahout.common.CommandLineUtil; +import org.apache.mahout.common.HadoopUtil; +import org.apache.mahout.common.commandline.DefaultOptionCreator; +import org.apache.mahout.common.iterator.FileLineIterable; +import org.apache.mahout.text.wikipedia.WikipediaMapper; +import org.apache.mahout.text.wikipedia.XmlInputFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Create and run the Wikipedia Dataset Creator. + */ +public final class WikipediaToSequenceFile { + + private static final Logger log = LoggerFactory.getLogger(WikipediaToSequenceFile.class); + + private WikipediaToSequenceFile() { } + + /** + * Takes in two arguments: + * <ol> + * <li>The input {@link org.apache.hadoop.fs.Path} where the input documents live</li> + * <li>The output {@link org.apache.hadoop.fs.Path} where to write the classifier as a + * {@link org.apache.hadoop.io.SequenceFile}</li> + * </ol> + */ + public static void main(String[] args) throws IOException { + DefaultOptionBuilder obuilder = new DefaultOptionBuilder(); + ArgumentBuilder abuilder = new ArgumentBuilder(); + GroupBuilder gbuilder = new GroupBuilder(); + + Option dirInputPathOpt = DefaultOptionCreator.inputOption().create(); + + Option dirOutputPathOpt = DefaultOptionCreator.outputOption().create(); + + Option categoriesOpt = obuilder.withLongName("categories").withArgument( + abuilder.withName("categories").withMinimum(1).withMaximum(1).create()).withDescription( + "Location of the categories file. One entry per line. " + + "Will be used to make a string match in Wikipedia Category field").withShortName("c").create(); + + Option exactMatchOpt = obuilder.withLongName("exactMatch").withDescription( + "If set, then the category name must exactly match the " + + "entry in the categories file. Default is false").withShortName("e").create(); + + Option allOpt = obuilder.withLongName("all") + .withDescription("If set, Select all files. Default is false").withShortName("all").create(); + + Option removeLabelOpt = obuilder.withLongName("removeLabels") + .withDescription("If set, remove [[Category:labels]] from document text after extracting label." + + "Default is false").withShortName("rl").create(); + + Option helpOpt = DefaultOptionCreator.helpOption(); + + Group group = gbuilder.withName("Options").withOption(categoriesOpt).withOption(dirInputPathOpt) + .withOption(dirOutputPathOpt).withOption(exactMatchOpt).withOption(allOpt).withOption(helpOpt) + .withOption(removeLabelOpt).create(); + + Parser parser = new Parser(); + parser.setGroup(group); + parser.setHelpOption(helpOpt); + try { + CommandLine cmdLine = parser.parse(args); + if (cmdLine.hasOption(helpOpt)) { + CommandLineUtil.printHelp(group); + return; + } + + String inputPath = (String) cmdLine.getValue(dirInputPathOpt); + String outputPath = (String) cmdLine.getValue(dirOutputPathOpt); + + String catFile = ""; + if (cmdLine.hasOption(categoriesOpt)) { + catFile = (String) cmdLine.getValue(categoriesOpt); + } + + boolean all = false; + if (cmdLine.hasOption(allOpt)) { + all = true; + } + + boolean removeLabels = false; + if (cmdLine.hasOption(removeLabelOpt)) { + removeLabels = true; + } + + runJob(inputPath, outputPath, catFile, cmdLine.hasOption(exactMatchOpt), all, removeLabels); + } catch (OptionException | InterruptedException | ClassNotFoundException e) { + log.error("Exception", e); + CommandLineUtil.printHelp(group); + } + } + + /** + * Run the job + * + * @param input + * the input pathname String + * @param output + * the output pathname String + * @param catFile + * the file containing the Wikipedia categories + * @param exactMatchOnly + * if true, then the Wikipedia category must match exactly instead of simply containing the + * category string + * @param all + * if true select all categories + * @param removeLabels + * if true remove Category labels from document text after extracting. + * + */ + public static void runJob(String input, + String output, + String catFile, + boolean exactMatchOnly, + boolean all, + boolean removeLabels) throws IOException, InterruptedException, ClassNotFoundException { + Configuration conf = new Configuration(); + conf.set("xmlinput.start", "<page>"); + conf.set("xmlinput.end", "</page>"); + conf.setBoolean("exact.match.only", exactMatchOnly); + conf.setBoolean("all.files", all); + conf.setBoolean("remove.labels", removeLabels); + conf.set("io.serializations", + "org.apache.hadoop.io.serializer.JavaSerialization," + + "org.apache.hadoop.io.serializer.WritableSerialization"); + + Set<String> categories = new HashSet<>(); + if (!catFile.isEmpty()) { + for (String line : new FileLineIterable(new File(catFile))) { + categories.add(line.trim().toLowerCase(Locale.ENGLISH)); + } + } + + Stringifier<Set<String>> setStringifier = + new DefaultStringifier<>(conf, GenericsUtil.getClass(categories)); + + String categoriesStr = setStringifier.toString(categories); + conf.set("wikipedia.categories", categoriesStr); + + Job job = new Job(conf); + log.info("Input: {} Out: {} Categories: {} All Files: {}", input, output, catFile, all); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Text.class); + FileInputFormat.setInputPaths(job, new Path(input)); + Path outPath = new Path(output); + FileOutputFormat.setOutputPath(job, outPath); + job.setMapperClass(WikipediaMapper.class); + job.setInputFormatClass(XmlInputFormat.class); + job.setReducerClass(Reducer.class); + job.setOutputFormatClass(SequenceFileOutputFormat.class); + job.setJarByClass(WikipediaToSequenceFile.class); + + /* + * conf.set("mapred.compress.map.output", "true"); conf.set("mapred.map.output.compression.type", + * "BLOCK"); conf.set("mapred.output.compress", "true"); conf.set("mapred.output.compression.type", + * "BLOCK"); conf.set("mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec"); + */ + HadoopUtil.delete(conf, outPath); + + boolean succeeded = job.waitForCompletion(true); + if (!succeeded) { + throw new IllegalStateException("Job failed!"); + } + + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaAnalyzer.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaAnalyzer.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaAnalyzer.java new file mode 100644 index 0000000..d50323d --- /dev/null +++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaAnalyzer.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.text.wikipedia; + +import org.apache.lucene.analysis.TokenStream; +import org.apache.lucene.analysis.Tokenizer; +import org.apache.lucene.analysis.core.LowerCaseFilter; +import org.apache.lucene.analysis.core.StopAnalyzer; +import org.apache.lucene.analysis.core.StopFilter; +import org.apache.lucene.analysis.standard.StandardFilter; +import org.apache.lucene.analysis.util.CharArraySet; +import org.apache.lucene.analysis.util.StopwordAnalyzerBase; +import org.apache.lucene.analysis.wikipedia.WikipediaTokenizer; + + +public class WikipediaAnalyzer extends StopwordAnalyzerBase { + + public WikipediaAnalyzer() { + super(StopAnalyzer.ENGLISH_STOP_WORDS_SET); + } + + public WikipediaAnalyzer(CharArraySet stopSet) { + super(stopSet); + } + + @Override + protected TokenStreamComponents createComponents(String fieldName) { + Tokenizer tokenizer = new WikipediaTokenizer(); + TokenStream result = new StandardFilter(tokenizer); + result = new LowerCaseFilter(result); + result = new StopFilter(result, getStopwordSet()); + return new TokenStreamComponents(tokenizer, result); + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorDriver.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorDriver.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorDriver.java new file mode 100644 index 0000000..8214407 --- /dev/null +++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorDriver.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.text.wikipedia; + +import java.io.File; +import java.io.IOException; +import java.util.HashSet; +import java.util.Locale; +import java.util.Set; + +import org.apache.commons.cli2.CommandLine; +import org.apache.commons.cli2.Group; +import org.apache.commons.cli2.Option; +import org.apache.commons.cli2.OptionException; +import org.apache.commons.cli2.builder.ArgumentBuilder; +import org.apache.commons.cli2.builder.DefaultOptionBuilder; +import org.apache.commons.cli2.builder.GroupBuilder; +import org.apache.commons.cli2.commandline.Parser; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DefaultStringifier; +import org.apache.hadoop.io.Stringifier; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.util.GenericsUtil; +import org.apache.lucene.analysis.Analyzer; +import org.apache.mahout.common.ClassUtils; +import org.apache.mahout.common.CommandLineUtil; +import org.apache.mahout.common.HadoopUtil; +import org.apache.mahout.common.commandline.DefaultOptionCreator; +import org.apache.mahout.common.iterator.FileLineIterable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Create and run the Wikipedia Dataset Creator. + */ +public final class WikipediaDatasetCreatorDriver { + private static final Logger log = LoggerFactory.getLogger(WikipediaDatasetCreatorDriver.class); + + private WikipediaDatasetCreatorDriver() { } + + /** + * Takes in two arguments: + * <ol> + * <li>The input {@link org.apache.hadoop.fs.Path} where the input documents live</li> + * <li>The output {@link org.apache.hadoop.fs.Path} where to write the classifier as a + * {@link org.apache.hadoop.io.SequenceFile}</li> + * </ol> + */ + public static void main(String[] args) throws IOException, InterruptedException { + DefaultOptionBuilder obuilder = new DefaultOptionBuilder(); + ArgumentBuilder abuilder = new ArgumentBuilder(); + GroupBuilder gbuilder = new GroupBuilder(); + + Option dirInputPathOpt = DefaultOptionCreator.inputOption().create(); + + Option dirOutputPathOpt = DefaultOptionCreator.outputOption().create(); + + Option categoriesOpt = obuilder.withLongName("categories").withRequired(true).withArgument( + abuilder.withName("categories").withMinimum(1).withMaximum(1).create()).withDescription( + "Location of the categories file. One entry per line. " + + "Will be used to make a string match in Wikipedia Category field").withShortName("c").create(); + + Option exactMatchOpt = obuilder.withLongName("exactMatch").withDescription( + "If set, then the category name must exactly match the " + + "entry in the categories file. Default is false").withShortName("e").create(); + Option analyzerOpt = obuilder.withLongName("analyzer").withRequired(false).withArgument( + abuilder.withName("analyzer").withMinimum(1).withMaximum(1).create()).withDescription( + "The analyzer to use, must have a no argument constructor").withShortName("a").create(); + Option helpOpt = DefaultOptionCreator.helpOption(); + + Group group = gbuilder.withName("Options").withOption(categoriesOpt).withOption(dirInputPathOpt) + .withOption(dirOutputPathOpt).withOption(exactMatchOpt).withOption(analyzerOpt).withOption(helpOpt) + .create(); + + Parser parser = new Parser(); + parser.setGroup(group); + try { + CommandLine cmdLine = parser.parse(args); + if (cmdLine.hasOption(helpOpt)) { + CommandLineUtil.printHelp(group); + return; + } + + String inputPath = (String) cmdLine.getValue(dirInputPathOpt); + String outputPath = (String) cmdLine.getValue(dirOutputPathOpt); + String catFile = (String) cmdLine.getValue(categoriesOpt); + Class<? extends Analyzer> analyzerClass = WikipediaAnalyzer.class; + if (cmdLine.hasOption(analyzerOpt)) { + String className = cmdLine.getValue(analyzerOpt).toString(); + analyzerClass = Class.forName(className).asSubclass(Analyzer.class); + // try instantiating it, b/c there isn't any point in setting it if + // you can't instantiate it + ClassUtils.instantiateAs(analyzerClass, Analyzer.class); + } + runJob(inputPath, outputPath, catFile, cmdLine.hasOption(exactMatchOpt), + analyzerClass); + } catch (OptionException e) { + log.error("Exception", e); + CommandLineUtil.printHelp(group); + } catch (ClassNotFoundException e) { + log.error("Exception", e); + CommandLineUtil.printHelp(group); + } + } + + /** + * Run the job + * + * @param input + * the input pathname String + * @param output + * the output pathname String + * @param catFile + * the file containing the Wikipedia categories + * @param exactMatchOnly + * if true, then the Wikipedia category must match exactly instead of simply containing the + * category string + */ + public static void runJob(String input, + String output, + String catFile, + boolean exactMatchOnly, + Class<? extends Analyzer> analyzerClass) + throws IOException, InterruptedException, ClassNotFoundException { + Configuration conf = new Configuration(); + conf.set("key.value.separator.in.input.line", " "); + conf.set("xmlinput.start", "<page>"); + conf.set("xmlinput.end", "</page>"); + conf.setBoolean("exact.match.only", exactMatchOnly); + conf.set("analyzer.class", analyzerClass.getName()); + conf.set("io.serializations", + "org.apache.hadoop.io.serializer.JavaSerialization," + + "org.apache.hadoop.io.serializer.WritableSerialization"); + // Dont ever forget this. People should keep track of how hadoop conf + // parameters can make or break a piece of code + + Set<String> categories = new HashSet<>(); + for (String line : new FileLineIterable(new File(catFile))) { + categories.add(line.trim().toLowerCase(Locale.ENGLISH)); + } + + Stringifier<Set<String>> setStringifier = + new DefaultStringifier<>(conf, GenericsUtil.getClass(categories)); + + String categoriesStr = setStringifier.toString(categories); + + conf.set("wikipedia.categories", categoriesStr); + + Job job = new Job(conf); + log.info("Input: {} Out: {} Categories: {}", input, output, catFile); + job.setJarByClass(WikipediaDatasetCreatorDriver.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Text.class); + job.setMapperClass(WikipediaDatasetCreatorMapper.class); + //TODO: job.setNumMapTasks(100); + job.setInputFormatClass(XmlInputFormat.class); + job.setReducerClass(WikipediaDatasetCreatorReducer.class); + job.setOutputFormatClass(TextOutputFormat.class); + + FileInputFormat.setInputPaths(job, new Path(input)); + Path outPath = new Path(output); + FileOutputFormat.setOutputPath(job, outPath); + HadoopUtil.delete(conf, outPath); + + boolean succeeded = job.waitForCompletion(true); + if (!succeeded) { + throw new IllegalStateException("Job failed!"); + } + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorMapper.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorMapper.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorMapper.java new file mode 100644 index 0000000..50e5f37 --- /dev/null +++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorMapper.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.text.wikipedia; + +import com.google.common.io.Closeables; +import org.apache.commons.lang3.StringEscapeUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DefaultStringifier; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.util.GenericsUtil; +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.analysis.TokenStream; +import org.apache.lucene.analysis.tokenattributes.CharTermAttribute; +import org.apache.mahout.common.ClassUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.StringReader; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Set; +import java.util.regex.Pattern; + +/** + * Maps over Wikipedia xml format and output all document having the category listed in the input category + * file + * + */ +public class WikipediaDatasetCreatorMapper extends Mapper<LongWritable, Text, Text, Text> { + + private static final Logger log = LoggerFactory.getLogger(WikipediaDatasetCreatorMapper.class); + + private static final Pattern SPACE_NON_ALPHA_PATTERN = Pattern.compile("[\\s\\W]"); + private static final Pattern OPEN_TEXT_TAG_PATTERN = Pattern.compile("<text xml:space=\"preserve\">"); + private static final Pattern CLOSE_TEXT_TAG_PATTERN = Pattern.compile("</text>"); + + private List<String> inputCategories; + private List<Pattern> inputCategoryPatterns; + private boolean exactMatchOnly; + private Analyzer analyzer; + + @Override + protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { + String document = value.toString(); + document = StringEscapeUtils.unescapeHtml4(CLOSE_TEXT_TAG_PATTERN.matcher( + OPEN_TEXT_TAG_PATTERN.matcher(document).replaceFirst("")).replaceAll("")); + String catMatch = findMatchingCategory(document); + if (!"Unknown".equals(catMatch)) { + StringBuilder contents = new StringBuilder(1000); + TokenStream stream = analyzer.tokenStream(catMatch, new StringReader(document)); + CharTermAttribute termAtt = stream.addAttribute(CharTermAttribute.class); + stream.reset(); + while (stream.incrementToken()) { + contents.append(termAtt.buffer(), 0, termAtt.length()).append(' '); + } + context.write( + new Text(SPACE_NON_ALPHA_PATTERN.matcher(catMatch).replaceAll("_")), + new Text(contents.toString())); + stream.end(); + Closeables.close(stream, true); + } + } + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + super.setup(context); + + Configuration conf = context.getConfiguration(); + + if (inputCategories == null) { + Set<String> newCategories = new HashSet<>(); + DefaultStringifier<Set<String>> setStringifier = + new DefaultStringifier<>(conf, GenericsUtil.getClass(newCategories)); + String categoriesStr = conf.get("wikipedia.categories", setStringifier.toString(newCategories)); + Set<String> inputCategoriesSet = setStringifier.fromString(categoriesStr); + inputCategories = new ArrayList<>(inputCategoriesSet); + inputCategoryPatterns = new ArrayList<>(inputCategories.size()); + for (String inputCategory : inputCategories) { + inputCategoryPatterns.add(Pattern.compile(".*\\b" + inputCategory + "\\b.*")); + } + + } + + exactMatchOnly = conf.getBoolean("exact.match.only", false); + + if (analyzer == null) { + String analyzerStr = conf.get("analyzer.class", WikipediaAnalyzer.class.getName()); + analyzer = ClassUtils.instantiateAs(analyzerStr, Analyzer.class); + } + + log.info("Configure: Input Categories size: {} Exact Match: {} Analyzer: {}", + inputCategories.size(), exactMatchOnly, analyzer.getClass().getName()); + } + + private String findMatchingCategory(String document) { + int startIndex = 0; + int categoryIndex; + while ((categoryIndex = document.indexOf("[[Category:", startIndex)) != -1) { + categoryIndex += 11; + int endIndex = document.indexOf("]]", categoryIndex); + if (endIndex >= document.length() || endIndex < 0) { + break; + } + String category = document.substring(categoryIndex, endIndex).toLowerCase(Locale.ENGLISH).trim(); + // categories.add(category.toLowerCase()); + if (exactMatchOnly && inputCategories.contains(category)) { + return category; + } + if (!exactMatchOnly) { + for (int i = 0; i < inputCategories.size(); i++) { + String inputCategory = inputCategories.get(i); + Pattern inputCategoryPattern = inputCategoryPatterns.get(i); + if (inputCategoryPattern.matcher(category).matches()) { // inexact match with word boundary. + return inputCategory; + } + } + } + startIndex = endIndex; + } + return "Unknown"; + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorReducer.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorReducer.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorReducer.java new file mode 100644 index 0000000..bf921fc --- /dev/null +++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorReducer.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.text.wikipedia; + +import java.io.IOException; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Reducer; + +/** + * Can also be used as a local Combiner + */ +public class WikipediaDatasetCreatorReducer extends Reducer<Text, Text, Text, Text> { + + @Override + protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { + // Key is label,word, value is the number of times we've seen this label + // word per local node. Output is the same + for (Text value : values) { + context.write(key, value); + } + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaMapper.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaMapper.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaMapper.java new file mode 100644 index 0000000..abd3a04 --- /dev/null +++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaMapper.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.text.wikipedia; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Locale; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.commons.lang3.StringEscapeUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DefaultStringifier; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.util.GenericsUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Maps over Wikipedia xml format and output all document having the category listed in the input category + * file + * + */ +public class WikipediaMapper extends Mapper<LongWritable, Text, Text, Text> { + + private static final Logger log = LoggerFactory.getLogger(WikipediaMapper.class); + + private static final Pattern SPACE_NON_ALPHA_PATTERN = Pattern.compile("[\\s]"); + + private static final String START_DOC = "<text xml:space=\"preserve\">"; + + private static final String END_DOC = "</text>"; + + private static final Pattern TITLE = Pattern.compile("<title>(.*)<\\/title>"); + + private static final String REDIRECT = "<redirect />"; + + private Set<String> inputCategories; + + private boolean exactMatchOnly; + + private boolean all; + + private boolean removeLabels; + + @Override + protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { + + String content = value.toString(); + if (content.contains(REDIRECT)) { + return; + } + String document; + String title; + try { + document = getDocument(content); + title = getTitle(content); + } catch (RuntimeException e) { + // TODO: reporter.getCounter("Wikipedia", "Parse errors").increment(1); + return; + } + + String catMatch = findMatchingCategory(document); + if (!all) { + if ("Unknown".equals(catMatch)) { + return; + } + } + + document = StringEscapeUtils.unescapeHtml4(document); + if (removeLabels) { + document = removeCategoriesFromText(document); + // Reject documents with malformed tags + if (document == null) { + return; + } + } + + // write out in Bayes input style: key: /Category/document_name + String category = "/" + catMatch.toLowerCase(Locale.ENGLISH) + "/" + + SPACE_NON_ALPHA_PATTERN.matcher(title).replaceAll("_"); + + context.write(new Text(category), new Text(document)); + } + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + super.setup(context); + Configuration conf = context.getConfiguration(); + + Set<String> newCategories = new HashSet<>(); + DefaultStringifier<Set<String>> setStringifier = + new DefaultStringifier<>(conf, GenericsUtil.getClass(newCategories)); + + String categoriesStr = conf.get("wikipedia.categories"); + inputCategories = setStringifier.fromString(categoriesStr); + exactMatchOnly = conf.getBoolean("exact.match.only", false); + all = conf.getBoolean("all.files", false); + removeLabels = conf.getBoolean("remove.labels",false); + log.info("Configure: Input Categories size: {} All: {} Exact Match: {} Remove Labels from Text: {}", + inputCategories.size(), all, exactMatchOnly, removeLabels); + } + + private static String getDocument(String xml) { + int start = xml.indexOf(START_DOC) + START_DOC.length(); + int end = xml.indexOf(END_DOC, start); + return xml.substring(start, end); + } + + private static String getTitle(CharSequence xml) { + Matcher m = TITLE.matcher(xml); + return m.find() ? m.group(1) : ""; + } + + private String findMatchingCategory(String document) { + int startIndex = 0; + int categoryIndex; + while ((categoryIndex = document.indexOf("[[Category:", startIndex)) != -1) { + categoryIndex += 11; + int endIndex = document.indexOf("]]", categoryIndex); + if (endIndex >= document.length() || endIndex < 0) { + break; + } + String category = document.substring(categoryIndex, endIndex).toLowerCase(Locale.ENGLISH).trim(); + if (exactMatchOnly && inputCategories.contains(category)) { + return category.toLowerCase(Locale.ENGLISH); + } + if (!exactMatchOnly) { + for (String inputCategory : inputCategories) { + if (category.contains(inputCategory)) { // we have an inexact match + return inputCategory.toLowerCase(Locale.ENGLISH); + } + } + } + startIndex = endIndex; + } + return "Unknown"; + } + + private String removeCategoriesFromText(String document) { + int startIndex = 0; + int categoryIndex; + try { + while ((categoryIndex = document.indexOf("[[Category:", startIndex)) != -1) { + int endIndex = document.indexOf("]]", categoryIndex); + if (endIndex >= document.length() || endIndex < 0) { + break; + } + document = document.replace(document.substring(categoryIndex, endIndex + 2), ""); + if (categoryIndex < document.length()) { + startIndex = categoryIndex; + } else { + break; + } + } + } catch(StringIndexOutOfBoundsException e) { + return null; + } + return document; + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaXmlSplitter.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaXmlSplitter.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaXmlSplitter.java new file mode 100644 index 0000000..fc065fe --- /dev/null +++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaXmlSplitter.java @@ -0,0 +1,234 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.text.wikipedia; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.net.URI; +import java.text.DecimalFormat; +import java.text.NumberFormat; + +import org.apache.commons.cli2.CommandLine; +import org.apache.commons.cli2.Group; +import org.apache.commons.cli2.Option; +import org.apache.commons.cli2.OptionException; +import org.apache.commons.cli2.builder.ArgumentBuilder; +import org.apache.commons.cli2.builder.DefaultOptionBuilder; +import org.apache.commons.cli2.builder.GroupBuilder; +import org.apache.commons.cli2.commandline.Parser; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.compress.BZip2Codec; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.mahout.common.CommandLineUtil; +import org.apache.mahout.common.iterator.FileLineIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * <p>The Bayes example package provides some helper classes for training the Naive Bayes classifier + * on the Twenty Newsgroups data. See {@code PrepareTwentyNewsgroups} + * for details on running the trainer and + * formatting the Twenty Newsgroups data properly for the training.</p> + * + * <p>The easiest way to prepare the data is to use the ant task in core/build.xml:</p> + * + * <p>{@code ant extract-20news-18828}</p> + * + * <p>This runs the arg line:</p> + * + * <p>{@code -p $\{working.dir\}/20news-18828/ -o $\{working.dir\}/20news-18828-collapse -a $\{analyzer\} -c UTF-8}</p> + * + * <p>To Run the Wikipedia examples (assumes you've built the Mahout Job jar):</p> + * + * <ol> + * <li>Download the Wikipedia Dataset. Use the Ant target: {@code ant enwiki-files}</li> + * <li>Chunk the data using the WikipediaXmlSplitter (from the Hadoop home): + * {@code bin/hadoop jar $MAHOUT_HOME/target/mahout-examples-0.x + * org.apache.mahout.classifier.bayes.WikipediaXmlSplitter + * -d $MAHOUT_HOME/examples/temp/enwiki-latest-pages-articles.xml + * -o $MAHOUT_HOME/examples/work/wikipedia/chunks/ -c 64}</li> + * </ol> + */ +public final class WikipediaXmlSplitter { + + private static final Logger log = LoggerFactory.getLogger(WikipediaXmlSplitter.class); + + private WikipediaXmlSplitter() { } + + public static void main(String[] args) throws IOException { + DefaultOptionBuilder obuilder = new DefaultOptionBuilder(); + ArgumentBuilder abuilder = new ArgumentBuilder(); + GroupBuilder gbuilder = new GroupBuilder(); + + Option dumpFileOpt = obuilder.withLongName("dumpFile").withRequired(true).withArgument( + abuilder.withName("dumpFile").withMinimum(1).withMaximum(1).create()).withDescription( + "The path to the wikipedia dump file (.bz2 or uncompressed)").withShortName("d").create(); + + Option outputDirOpt = obuilder.withLongName("outputDir").withRequired(true).withArgument( + abuilder.withName("outputDir").withMinimum(1).withMaximum(1).create()).withDescription( + "The output directory to place the splits in:\n" + + "local files:\n\t/var/data/wikipedia-xml-chunks or\n\tfile:///var/data/wikipedia-xml-chunks\n" + + "Hadoop DFS:\n\thdfs://wikipedia-xml-chunks\n" + + "AWS S3 (blocks):\n\ts3://bucket-name/wikipedia-xml-chunks\n" + + "AWS S3 (native files):\n\ts3n://bucket-name/wikipedia-xml-chunks\n") + + .withShortName("o").create(); + + Option s3IdOpt = obuilder.withLongName("s3ID").withRequired(false).withArgument( + abuilder.withName("s3Id").withMinimum(1).withMaximum(1).create()).withDescription("Amazon S3 ID key") + .withShortName("i").create(); + Option s3SecretOpt = obuilder.withLongName("s3Secret").withRequired(false).withArgument( + abuilder.withName("s3Secret").withMinimum(1).withMaximum(1).create()).withDescription( + "Amazon S3 secret key").withShortName("s").create(); + + Option chunkSizeOpt = obuilder.withLongName("chunkSize").withRequired(true).withArgument( + abuilder.withName("chunkSize").withMinimum(1).withMaximum(1).create()).withDescription( + "The Size of the chunk, in megabytes").withShortName("c").create(); + Option numChunksOpt = obuilder + .withLongName("numChunks") + .withRequired(false) + .withArgument(abuilder.withName("numChunks").withMinimum(1).withMaximum(1).create()) + .withDescription( + "The maximum number of chunks to create. If specified, program will only create a subset of the chunks") + .withShortName("n").create(); + Group group = gbuilder.withName("Options").withOption(dumpFileOpt).withOption(outputDirOpt).withOption( + chunkSizeOpt).withOption(numChunksOpt).withOption(s3IdOpt).withOption(s3SecretOpt).create(); + + Parser parser = new Parser(); + parser.setGroup(group); + CommandLine cmdLine; + try { + cmdLine = parser.parse(args); + } catch (OptionException e) { + log.error("Error while parsing options", e); + CommandLineUtil.printHelp(group); + return; + } + + Configuration conf = new Configuration(); + String dumpFilePath = (String) cmdLine.getValue(dumpFileOpt); + String outputDirPath = (String) cmdLine.getValue(outputDirOpt); + + if (cmdLine.hasOption(s3IdOpt)) { + String id = (String) cmdLine.getValue(s3IdOpt); + conf.set("fs.s3n.awsAccessKeyId", id); + conf.set("fs.s3.awsAccessKeyId", id); + } + if (cmdLine.hasOption(s3SecretOpt)) { + String secret = (String) cmdLine.getValue(s3SecretOpt); + conf.set("fs.s3n.awsSecretAccessKey", secret); + conf.set("fs.s3.awsSecretAccessKey", secret); + } + // do not compute crc file when using local FS + conf.set("fs.file.impl", "org.apache.hadoop.fs.RawLocalFileSystem"); + FileSystem fs = FileSystem.get(URI.create(outputDirPath), conf); + + int chunkSize = 1024 * 1024 * Integer.parseInt((String) cmdLine.getValue(chunkSizeOpt)); + + int numChunks = Integer.MAX_VALUE; + if (cmdLine.hasOption(numChunksOpt)) { + numChunks = Integer.parseInt((String) cmdLine.getValue(numChunksOpt)); + } + + String header = "<mediawiki xmlns=\"http://www.mediawiki.org/xml/export-0.3/\" " + + "xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" " + + "xsi:schemaLocation=\"http://www.mediawiki.org/xml/export-0.3/ " + + "http://www.mediawiki.org/xml/export-0.3.xsd\" " + "version=\"0.3\" " + + "xml:lang=\"en\">\n" + " <siteinfo>\n" + "<sitename>Wikipedia</sitename>\n" + + " <base>http://en.wikipedia.org/wiki/Main_Page</base>\n" + + " <generator>MediaWiki 1.13alpha</generator>\n" + " <case>first-letter</case>\n" + + " <namespaces>\n" + " <namespace key=\"-2\">Media</namespace>\n" + + " <namespace key=\"-1\">Special</namespace>\n" + " <namespace key=\"0\" />\n" + + " <namespace key=\"1\">Talk</namespace>\n" + + " <namespace key=\"2\">User</namespace>\n" + + " <namespace key=\"3\">User talk</namespace>\n" + + " <namespace key=\"4\">Wikipedia</namespace>\n" + + " <namespace key=\"5\">Wikipedia talk</namespace>\n" + + " <namespace key=\"6\">Image</namespace>\n" + + " <namespace key=\"7\">Image talk</namespace>\n" + + " <namespace key=\"8\">MediaWiki</namespace>\n" + + " <namespace key=\"9\">MediaWiki talk</namespace>\n" + + " <namespace key=\"10\">Template</namespace>\n" + + " <namespace key=\"11\">Template talk</namespace>\n" + + " <namespace key=\"12\">Help</namespace>\n" + + " <namespace key=\"13\">Help talk</namespace>\n" + + " <namespace key=\"14\">Category</namespace>\n" + + " <namespace key=\"15\">Category talk</namespace>\n" + + " <namespace key=\"100\">Portal</namespace>\n" + + " <namespace key=\"101\">Portal talk</namespace>\n" + " </namespaces>\n" + + " </siteinfo>\n"; + + StringBuilder content = new StringBuilder(); + content.append(header); + NumberFormat decimalFormatter = new DecimalFormat("0000"); + File dumpFile = new File(dumpFilePath); + + // If the specified path for the input file is incorrect, return immediately + if (!dumpFile.exists()) { + log.error("Input file path {} doesn't exist", dumpFilePath); + return; + } + + FileLineIterator it; + if (dumpFilePath.endsWith(".bz2")) { + // default compression format from http://download.wikimedia.org + CompressionCodec codec = new BZip2Codec(); + it = new FileLineIterator(codec.createInputStream(new FileInputStream(dumpFile))); + } else { + // assume the user has previously de-compressed the dump file + it = new FileLineIterator(dumpFile); + } + int fileNumber = 0; + while (it.hasNext()) { + String thisLine = it.next(); + if (thisLine.trim().startsWith("<page>")) { + boolean end = false; + while (!thisLine.trim().startsWith("</page>")) { + content.append(thisLine).append('\n'); + if (it.hasNext()) { + thisLine = it.next(); + } else { + end = true; + break; + } + } + content.append(thisLine).append('\n'); + + if (content.length() > chunkSize || end) { + content.append("</mediawiki>"); + fileNumber++; + String filename = outputDirPath + "/chunk-" + decimalFormatter.format(fileNumber) + ".xml"; + try (BufferedWriter chunkWriter = + new BufferedWriter(new OutputStreamWriter(fs.create(new Path(filename)), "UTF-8"))) { + chunkWriter.write(content.toString(), 0, content.length()); + } + if (fileNumber >= numChunks) { + break; + } + content = new StringBuilder(); + content.append(header); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/XmlInputFormat.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/XmlInputFormat.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/XmlInputFormat.java new file mode 100644 index 0000000..afd350f --- /dev/null +++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/XmlInputFormat.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.text.wikipedia; + +import com.google.common.io.Closeables; +import org.apache.commons.io.Charsets; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * Reads records that are delimited by a specific begin/end tag. + */ +public class XmlInputFormat extends TextInputFormat { + + private static final Logger log = LoggerFactory.getLogger(XmlInputFormat.class); + + public static final String START_TAG_KEY = "xmlinput.start"; + public static final String END_TAG_KEY = "xmlinput.end"; + + @Override + public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) { + try { + return new XmlRecordReader((FileSplit) split, context.getConfiguration()); + } catch (IOException ioe) { + log.warn("Error while creating XmlRecordReader", ioe); + return null; + } + } + + /** + * XMLRecordReader class to read through a given xml document to output xml blocks as records as specified + * by the start tag and end tag + * + */ + public static class XmlRecordReader extends RecordReader<LongWritable, Text> { + + private final byte[] startTag; + private final byte[] endTag; + private final long start; + private final long end; + private final FSDataInputStream fsin; + private final DataOutputBuffer buffer = new DataOutputBuffer(); + private LongWritable currentKey; + private Text currentValue; + + public XmlRecordReader(FileSplit split, Configuration conf) throws IOException { + startTag = conf.get(START_TAG_KEY).getBytes(Charsets.UTF_8); + endTag = conf.get(END_TAG_KEY).getBytes(Charsets.UTF_8); + + // open the file and seek to the start of the split + start = split.getStart(); + end = start + split.getLength(); + Path file = split.getPath(); + FileSystem fs = file.getFileSystem(conf); + fsin = fs.open(split.getPath()); + fsin.seek(start); + } + + private boolean next(LongWritable key, Text value) throws IOException { + if (fsin.getPos() < end && readUntilMatch(startTag, false)) { + try { + buffer.write(startTag); + if (readUntilMatch(endTag, true)) { + key.set(fsin.getPos()); + value.set(buffer.getData(), 0, buffer.getLength()); + return true; + } + } finally { + buffer.reset(); + } + } + return false; + } + + @Override + public void close() throws IOException { + Closeables.close(fsin, true); + } + + @Override + public float getProgress() throws IOException { + return (fsin.getPos() - start) / (float) (end - start); + } + + private boolean readUntilMatch(byte[] match, boolean withinBlock) throws IOException { + int i = 0; + while (true) { + int b = fsin.read(); + // end of file: + if (b == -1) { + return false; + } + // save to buffer: + if (withinBlock) { + buffer.write(b); + } + + // check if we're matching: + if (b == match[i]) { + i++; + if (i >= match.length) { + return true; + } + } else { + i = 0; + } + // see if we've passed the stop point: + if (!withinBlock && i == 0 && fsin.getPos() >= end) { + return false; + } + } + } + + @Override + public LongWritable getCurrentKey() throws IOException, InterruptedException { + return currentKey; + } + + @Override + public Text getCurrentValue() throws IOException, InterruptedException { + return currentValue; + } + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + currentKey = new LongWritable(); + currentValue = new Text(); + return next(currentKey, currentValue); + } + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/Bump125.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/Bump125.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/Bump125.java new file mode 100644 index 0000000..1c55090 --- /dev/null +++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/Bump125.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.utils; + +/** + * Helps with making nice intervals at arbitrary scale. + * + * One use case is where we are producing progress or error messages every time an incoming + * record is received. It is generally bad form to produce a message for <i>every</i> input + * so it would be better to produce a message for each of the first 10 records, then every + * other record up to 20 and then every 5 records up to 50 and then every 10 records up to 100, + * more or less. The pattern can now repeat scaled up by 100. The total number of messages will scale + * with the log of the number of input lines which is much more survivable than direct output + * and because early records all get messages, we get indications early. + */ +public class Bump125 { + private static final int[] BUMPS = {1, 2, 5}; + + static int scale(double value, double base) { + double scale = value / base; + // scan for correct step + int i = 0; + while (i < BUMPS.length - 1 && BUMPS[i + 1] <= scale) { + i++; + } + return BUMPS[i]; + } + + static long base(double value) { + return Math.max(1, (long) Math.pow(10, (int) Math.floor(Math.log10(value)))); + } + + private long counter = 0; + + public long increment() { + long delta; + if (counter >= 10) { + long base = base(counter / 4.0); + int scale = scale(counter / 4.0, base); + delta = base * scale; + } else { + delta = 1; + } + counter += delta; + return counter; + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/MatrixDumper.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/MatrixDumper.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/MatrixDumper.java new file mode 100644 index 0000000..f63de83 --- /dev/null +++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/MatrixDumper.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.utils; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.io.PrintStream; +import java.util.List; +import java.util.Map; + +import org.apache.commons.io.Charsets; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.ToolRunner; +import org.apache.mahout.common.AbstractJob; +import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterator; +import org.apache.mahout.math.Matrix; +import org.apache.mahout.math.MatrixWritable; + +/** + * Export a Matrix in various text formats: + * * CSV file + * + * Input format: Hadoop SequenceFile with Text key and MatrixWritable value, 1 pair + * TODO: + * Needs class for key value- should not hard-code to Text. + * Options for row and column headers- stats software can be picky. + * Assumes only one matrix in a file. + */ +public final class MatrixDumper extends AbstractJob { + + private MatrixDumper() { } + + public static void main(String[] args) throws Exception { + ToolRunner.run(new MatrixDumper(), args); + } + + @Override + public int run(String[] args) throws Exception { + + addInputOption(); + addOption("output", "o", "Output path", null); // AbstractJob output feature requires param + Map<String, List<String>> parsedArgs = parseArguments(args); + if (parsedArgs == null) { + return -1; + } + String outputFile = hasOption("output") ? getOption("output") : null; + exportCSV(getInputPath(), outputFile, false); + return 0; + } + + private static void exportCSV(Path inputPath, String outputFile, boolean doLabels) throws IOException { + SequenceFileValueIterator<MatrixWritable> it = + new SequenceFileValueIterator<>(inputPath, true, new Configuration()); + Matrix m = it.next().get(); + it.close(); + PrintStream ps = getPrintStream(outputFile); + String[] columnLabels = getLabels(m.numCols(), m.getColumnLabelBindings(), "col"); + String[] rowLabels = getLabels(m.numRows(), m.getRowLabelBindings(), "row"); + if (doLabels) { + ps.print("rowid,"); + ps.print(columnLabels[0]); + for (int c = 1; c < m.numCols(); c++) { + ps.print(',' + columnLabels[c]); + } + ps.println(); + } + for (int r = 0; r < m.numRows(); r++) { + if (doLabels) { + ps.print(rowLabels[0] + ','); + } + ps.print(Double.toString(m.getQuick(r,0))); + for (int c = 1; c < m.numCols(); c++) { + ps.print(","); + ps.print(Double.toString(m.getQuick(r,c))); + } + ps.println(); + } + if (ps != System.out) { + ps.close(); + } + } + + private static PrintStream getPrintStream(String outputPath) throws IOException { + if (outputPath == null) { + return System.out; + } + File outputFile = new File(outputPath); + if (outputFile.exists()) { + outputFile.delete(); + } + outputFile.createNewFile(); + OutputStream os = new FileOutputStream(outputFile); + return new PrintStream(os, false, Charsets.UTF_8.displayName()); + } + + /** + * return the label set, sorted by matrix order + * if there are no labels, fabricate them using the starter string + * @param length + */ + private static String[] getLabels(int length, Map<String,Integer> labels, String start) { + if (labels != null) { + return sortLabels(labels); + } + String[] sorted = new String[length]; + for (int i = 1; i <= length; i++) { + sorted[i] = start + i; + } + return sorted; + } + + private static String[] sortLabels(Map<String,Integer> labels) { + String[] sorted = new String[labels.size()]; + for (Map.Entry<String,Integer> entry : labels.entrySet()) { + sorted[entry.getValue()] = entry.getKey(); + } + return sorted; + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/SequenceFileDumper.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/SequenceFileDumper.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/SequenceFileDumper.java new file mode 100644 index 0000000..e01868a --- /dev/null +++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/SequenceFileDumper.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.utils; + +import java.io.File; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.util.ArrayList; +import java.util.List; + +import com.google.common.io.Closeables; +import com.google.common.io.Files; +import org.apache.commons.io.Charsets; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.mahout.common.AbstractJob; +import org.apache.mahout.common.Pair; +import org.apache.mahout.common.iterator.sequencefile.PathFilters; +import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterator; +import org.apache.mahout.math.list.IntArrayList; +import org.apache.mahout.math.map.OpenObjectIntHashMap; + +public final class SequenceFileDumper extends AbstractJob { + + public SequenceFileDumper() { + setConf(new Configuration()); + } + + @Override + public int run(String[] args) throws Exception { + + addInputOption(); + addOutputOption(); + addOption("substring", "b", "The number of chars to print out per value", false); + addOption(buildOption("count", "c", "Report the count only", false, false, null)); + addOption("numItems", "n", "Output at most <n> key value pairs", false); + addOption(buildOption("facets", "fa", "Output the counts per key. Note, if there are a lot of unique keys, " + + "this can take up a fair amount of memory", false, false, null)); + addOption(buildOption("quiet", "q", "Print only file contents.", false, false, null)); + + if (parseArguments(args, false, true) == null) { + return -1; + } + + Path[] pathArr; + Configuration conf = new Configuration(); + Path input = getInputPath(); + FileSystem fs = input.getFileSystem(conf); + if (fs.getFileStatus(input).isDir()) { + pathArr = FileUtil.stat2Paths(fs.listStatus(input, PathFilters.logsCRCFilter())); + } else { + pathArr = new Path[1]; + pathArr[0] = input; + } + + + Writer writer; + boolean shouldClose; + if (hasOption("output")) { + shouldClose = true; + writer = Files.newWriter(new File(getOption("output")), Charsets.UTF_8); + } else { + shouldClose = false; + writer = new OutputStreamWriter(System.out, Charsets.UTF_8); + } + try { + for (Path path : pathArr) { + if (!hasOption("quiet")) { + writer.append("Input Path: ").append(String.valueOf(path)).append('\n'); + } + + int sub = Integer.MAX_VALUE; + if (hasOption("substring")) { + sub = Integer.parseInt(getOption("substring")); + } + boolean countOnly = hasOption("count"); + SequenceFileIterator<?, ?> iterator = new SequenceFileIterator<>(path, true, conf); + if (!hasOption("quiet")) { + writer.append("Key class: ").append(iterator.getKeyClass().toString()); + writer.append(" Value Class: ").append(iterator.getValueClass().toString()).append('\n'); + } + OpenObjectIntHashMap<String> facets = null; + if (hasOption("facets")) { + facets = new OpenObjectIntHashMap<>(); + } + long count = 0; + if (countOnly) { + while (iterator.hasNext()) { + Pair<?, ?> record = iterator.next(); + String key = record.getFirst().toString(); + if (facets != null) { + facets.adjustOrPutValue(key, 1, 1); //either insert or add 1 + } + count++; + } + writer.append("Count: ").append(String.valueOf(count)).append('\n'); + } else { + long numItems = Long.MAX_VALUE; + if (hasOption("numItems")) { + numItems = Long.parseLong(getOption("numItems")); + if (!hasOption("quiet")) { + writer.append("Max Items to dump: ").append(String.valueOf(numItems)).append("\n"); + } + } + while (iterator.hasNext() && count < numItems) { + Pair<?, ?> record = iterator.next(); + String key = record.getFirst().toString(); + writer.append("Key: ").append(key); + String str = record.getSecond().toString(); + writer.append(": Value: ").append(str.length() > sub + ? str.substring(0, sub) : str); + writer.write('\n'); + if (facets != null) { + facets.adjustOrPutValue(key, 1, 1); //either insert or add 1 + } + count++; + } + if (!hasOption("quiet")) { + writer.append("Count: ").append(String.valueOf(count)).append('\n'); + } + } + if (facets != null) { + List<String> keyList = new ArrayList<>(facets.size()); + + IntArrayList valueList = new IntArrayList(facets.size()); + facets.pairsSortedByKey(keyList, valueList); + writer.append("-----Facets---\n"); + writer.append("Key\t\tCount\n"); + int i = 0; + for (String key : keyList) { + writer.append(key).append("\t\t").append(String.valueOf(valueList.get(i++))).append('\n'); + } + } + } + writer.flush(); + + } finally { + if (shouldClose) { + Closeables.close(writer, false); + } + } + + + return 0; + } + + public static void main(String[] args) throws Exception { + new SequenceFileDumper().run(args); + } + +}
