http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/email/MailProcessor.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/email/MailProcessor.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/email/MailProcessor.java new file mode 100644 index 0000000..7db836f --- /dev/null +++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/email/MailProcessor.java @@ -0,0 +1,183 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.utils.email; + +import org.apache.mahout.common.iterator.FileLineIterable; +import org.apache.mahout.utils.io.ChunkedWriter; +import org.apache.mahout.utils.io.ChunkedWrapper; +import org.apache.mahout.utils.io.IOWriterWrapper; +import org.apache.mahout.utils.io.WrappedWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.Writer; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Converts an mbox mail archive into a group of Hadoop Sequence Files with equal size. The archive may optionally be + * gzipped or zipped. @see org.apache.mahout.text.SequenceFilesFromMailArchives + */ +public class MailProcessor { + + private static final Pattern MESSAGE_START = Pattern.compile("^From \\S+@\\S.*\\d{4}$", Pattern.CASE_INSENSITIVE); + private static final Pattern MESSAGE_ID_PREFIX = Pattern.compile("^message-id: <(.*)>$", Pattern.CASE_INSENSITIVE); + // regular expressions used to parse individual messages + public static final Pattern SUBJECT_PREFIX = Pattern.compile("^subject: (.*)$", Pattern.CASE_INSENSITIVE); + //we need to have at least one character + public static final Pattern FROM_PREFIX = Pattern.compile("^from: (\\S.*)$", Pattern.CASE_INSENSITIVE); + public static final Pattern REFS_PREFIX = Pattern.compile("^references: (.*)$", Pattern.CASE_INSENSITIVE); + public static final Pattern TO_PREFIX = Pattern.compile("^to: (.*)$", Pattern.CASE_INSENSITIVE); + + private final String prefix; + private final MailOptions options; + private final WrappedWriter writer; + + private static final Logger log = LoggerFactory.getLogger(MailProcessor.class); + + /** + * Creates a {@code MailProcessor} that does not write to sequence files, but to a single text file. + * This constructor is for debugging and testing purposes. + */ + public MailProcessor(MailOptions options, String prefix, Writer writer) { + this.writer = new IOWriterWrapper(writer); + this.options = options; + this.prefix = prefix; + } + + /** + * This is the main constructor of {@code MailProcessor}. + */ + public MailProcessor(MailOptions options, String prefix, ChunkedWriter writer) { + this.writer = new ChunkedWrapper(writer); + this.options = options; + this.prefix = prefix; + } + + /** + * Parses one complete mail archive, writing output to the {@code writer} constructor parameter. + * @param mboxFile mail archive to parse + * @return number of parsed mails + * @throws IOException + */ + public long parseMboxLineByLine(File mboxFile) throws IOException { + long messageCount = 0; + try { + StringBuilder contents = new StringBuilder(); + // tmps used during mail message parsing + StringBuilder body = new StringBuilder(); + Matcher messageIdMatcher = MESSAGE_ID_PREFIX.matcher(""); + Matcher messageBoundaryMatcher = MESSAGE_START.matcher(""); + String[] patternResults = new String[options.getPatternsToMatch().length]; + Matcher[] matchers = new Matcher[options.getPatternsToMatch().length]; + for (int i = 0; i < matchers.length; i++) { + matchers[i] = options.getPatternsToMatch()[i].matcher(""); + } + + String messageId = null; + boolean inBody = false; + Pattern quotedTextPattern = options.getQuotedTextPattern(); + for (String nextLine : new FileLineIterable(mboxFile, options.getCharset(), false)) { + if (options.isStripQuotedText() && quotedTextPattern.matcher(nextLine).find()) { + continue; + } + for (int i = 0; i < matchers.length; i++) { + Matcher matcher = matchers[i]; + matcher.reset(nextLine); + if (matcher.matches()) { + patternResults[i] = matcher.group(1); + } + } + + // only start appending body content after we've seen a message ID + if (messageId != null) { + // first, see if we hit the end of the message + messageBoundaryMatcher.reset(nextLine); + if (messageBoundaryMatcher.matches()) { + // done parsing this message ... write it out + String key = generateKey(mboxFile, prefix, messageId); + //if this ordering changes, then also change FromEmailToDictionaryMapper + writeContent(options.getSeparator(), contents, body, patternResults); + writer.write(key, contents.toString()); + contents.setLength(0); // reset the buffer + body.setLength(0); + + messageId = null; + inBody = false; + } else { + if (inBody && options.isIncludeBody()) { + if (!nextLine.isEmpty()) { + body.append(nextLine).append(options.getBodySeparator()); + } + } else { + // first empty line we see after reading the message Id + // indicates that we are in the body ... + inBody = nextLine.isEmpty(); + } + } + } else { + if (nextLine.length() > 14) { + messageIdMatcher.reset(nextLine); + if (messageIdMatcher.matches()) { + messageId = messageIdMatcher.group(1); + ++messageCount; + } + } + } + } + // write the last message in the file if available + if (messageId != null) { + String key = generateKey(mboxFile, prefix, messageId); + writeContent(options.getSeparator(), contents, body, patternResults); + writer.write(key, contents.toString()); + contents.setLength(0); // reset the buffer + } + } catch (FileNotFoundException e) { + // Skip file. + log.warn("Unable to process non-existing file", e); + } + // TODO: report exceptions and continue; + return messageCount; + } + + protected static String generateKey(File mboxFile, String prefix, String messageId) { + return prefix + File.separator + mboxFile.getName() + File.separator + messageId; + } + + public String getPrefix() { + return prefix; + } + + public MailOptions getOptions() { + return options; + } + + private static void writeContent(String separator, StringBuilder contents, CharSequence body, String[] matches) { + for (String match : matches) { + if (match != null) { + contents.append(match).append(separator); + } else { + contents.append(separator); + } + } + contents.append('\n').append(body); + } +}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/io/ChunkedWrapper.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/io/ChunkedWrapper.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/io/ChunkedWrapper.java new file mode 100644 index 0000000..473e86a --- /dev/null +++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/io/ChunkedWrapper.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.utils.io; + +import java.io.IOException; + +/** + * {@link ChunkedWriter} based implementation of the {@link WrappedWriter} interface. + */ +public class ChunkedWrapper implements WrappedWriter { + + private final ChunkedWriter writer; + + public ChunkedWrapper(ChunkedWriter writer) { + this.writer = writer; + } + + @Override + public void write(String key, String value) throws IOException { + writer.write(key, value); + } + + @Override + public void close() throws IOException { + writer.close(); + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/io/ChunkedWriter.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/io/ChunkedWriter.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/io/ChunkedWriter.java new file mode 100644 index 0000000..66cf15f --- /dev/null +++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/io/ChunkedWriter.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.mahout.utils.io; + +import com.google.common.io.Closeables; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; + +import java.io.Closeable; +import java.io.IOException; + +/** + * Writes data splitted in multiple Hadoop sequence files of approximate equal size. The data must consist + * of key-value pairs, both of them of String type. All sequence files are created in the same + * directory and named "chunk-0", "chunk-1", etc. + */ +public final class ChunkedWriter implements Closeable { + + private final int maxChunkSizeInBytes; + private final Path output; + private SequenceFile.Writer writer; + private int currentChunkID; + private int currentChunkSize; + private final FileSystem fs; + private final Configuration conf; + + /** + * @param conf needed by Hadoop to know what filesystem implementation to use. + * @param chunkSizeInMB approximate size of each file, in Megabytes. + * @param output directory where the sequence files will be created. + * @throws IOException + */ + public ChunkedWriter(Configuration conf, int chunkSizeInMB, Path output) throws IOException { + this.output = output; + this.conf = conf; + if (chunkSizeInMB > 1984) { + chunkSizeInMB = 1984; + } + maxChunkSizeInBytes = chunkSizeInMB * 1024 * 1024; + fs = FileSystem.get(output.toUri(), conf); + currentChunkID = 0; + writer = new SequenceFile.Writer(fs, conf, getPath(currentChunkID), Text.class, Text.class); + } + + private Path getPath(int chunkID) { + return new Path(output, "chunk-" + chunkID); + } + + /** Writes a new key-value pair, creating a new sequence file if necessary.*/ + public void write(String key, String value) throws IOException { + if (currentChunkSize > maxChunkSizeInBytes) { + Closeables.close(writer, false); + currentChunkID++; + writer = new SequenceFile.Writer(fs, conf, getPath(currentChunkID), Text.class, Text.class); + currentChunkSize = 0; + } + + Text keyT = new Text(key); + Text valueT = new Text(value); + currentChunkSize += keyT.getBytes().length + valueT.getBytes().length; // Overhead + writer.append(keyT, valueT); + } + + @Override + public void close() throws IOException { + Closeables.close(writer, false); + } +} + http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/io/IOWriterWrapper.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/io/IOWriterWrapper.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/io/IOWriterWrapper.java new file mode 100644 index 0000000..b7c3d42 --- /dev/null +++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/io/IOWriterWrapper.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.utils.io; + +import java.io.IOException; +import java.io.Writer; +/** + * Implementation of the {@link WrappedWriter} interface based on {@link java.io.Writer}. + */ +public class IOWriterWrapper implements WrappedWriter { + + private final Writer writer; + + public IOWriterWrapper(Writer writer) { + this.writer = writer; + } + + /** Writes a new key and value, separating them with one space. The value must end with a + * new line or some other delimiter, as it is not automatically added by this method + */ + @Override + public void write(String key, String value) throws IOException { + writer.write(key + ' ' + value); + } + + @Override + public void close() throws IOException { + writer.close(); + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/io/WrappedWriter.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/io/WrappedWriter.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/io/WrappedWriter.java new file mode 100644 index 0000000..b9900e9 --- /dev/null +++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/io/WrappedWriter.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.utils.io; + +import java.io.Closeable; +import java.io.IOException; + +/** + * Convenience class for wrapping either a java.io.Writer or a SequenceFile.Writer with some basic functionality + */ +public interface WrappedWriter extends Closeable { + + /** Writes a new key-value pair.*/ + void write(String key, String value) throws IOException; + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/BloomTokenFilter.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/BloomTokenFilter.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/BloomTokenFilter.java new file mode 100644 index 0000000..964c8cc --- /dev/null +++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/BloomTokenFilter.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.mahout.utils.nlp.collocations.llr; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.CharsetEncoder; +import java.nio.charset.CodingErrorAction; + +import org.apache.commons.io.Charsets; +import org.apache.hadoop.util.bloom.Filter; +import org.apache.hadoop.util.bloom.Key; +import org.apache.lucene.analysis.TokenFilter; +import org.apache.lucene.analysis.TokenStream; +import org.apache.lucene.analysis.tokenattributes.CharTermAttribute; + +/** + * Emits tokens based on bloom filter membership. + */ +public final class BloomTokenFilter extends TokenFilter { + + private final Filter filter; + private final CharTermAttribute termAtt; + private final CharsetEncoder encoder; + private final Key key; + private final boolean keepMembers; + + /** + * @param filter tokens will be checked for membership in this bloom filter + * @param in the tokenstream to read. + * @param keepMembers keep memoers of the bloom filter? If true works like + * a whitelist and members found in the list are kept and all others are + * dropped. If false works like a stoplist and members found in the + * filter are dropped all others are kept. + */ + public BloomTokenFilter(Filter filter, boolean keepMembers, TokenStream in) { + super(in); + this.filter = filter; + this.keepMembers = keepMembers; + this.key = new Key(); + this.termAtt = addAttribute(CharTermAttribute.class); + this.encoder = Charsets.UTF_8.newEncoder(). + onMalformedInput(CodingErrorAction.REPORT). + onUnmappableCharacter(CodingErrorAction.REPORT); + } + + @Override + public boolean incrementToken() throws IOException { + while (input.incrementToken()) { + ByteBuffer bytes = encoder.encode(CharBuffer.wrap(termAtt.buffer(), 0, termAtt.length())); + key.set(bytes.array(), 1.0f); + boolean member = filter.membershipTest(key); + if ((keepMembers && member) || (!keepMembers && !member)) { + return true; + } + } + return false; + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/AnalyzerTransformer.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/AnalyzerTransformer.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/AnalyzerTransformer.java new file mode 100644 index 0000000..4585a0a --- /dev/null +++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/AnalyzerTransformer.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.utils.regex; + +import java.io.IOException; +import java.io.StringReader; + +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.analysis.TokenStream; +import org.apache.lucene.analysis.standard.StandardAnalyzer; +import org.apache.lucene.analysis.tokenattributes.CharTermAttribute; +import org.apache.mahout.common.lucene.TokenStreamIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AnalyzerTransformer implements RegexTransformer { + + private Analyzer analyzer; + private String fieldName = "text"; + + private static final Logger log = LoggerFactory.getLogger(AnalyzerTransformer.class); + + public AnalyzerTransformer() { + this(new StandardAnalyzer(), "text"); + } + + public AnalyzerTransformer(Analyzer analyzer) { + this(analyzer, "text"); + } + + public AnalyzerTransformer(Analyzer analyzer, String fieldName) { + this.analyzer = analyzer; + this.fieldName = fieldName; + } + + @Override + public String transformMatch(String match) { + StringBuilder result = new StringBuilder(); + try (TokenStream ts = analyzer.tokenStream(fieldName, new StringReader(match))) { + ts.addAttribute(CharTermAttribute.class); + ts.reset(); + TokenStreamIterator iter = new TokenStreamIterator(ts); + while (iter.hasNext()) { + result.append(iter.next()).append(' '); + } + ts.end(); + } catch (IOException e) { + throw new IllegalStateException(e); + } + return result.toString(); + } + + public Analyzer getAnalyzer() { + return analyzer; + } + + public void setAnalyzer(Analyzer analyzer) { + this.analyzer = analyzer; + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/ChainTransformer.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/ChainTransformer.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/ChainTransformer.java new file mode 100644 index 0000000..d3e8e06 --- /dev/null +++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/ChainTransformer.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.utils.regex; + +import com.google.common.collect.Lists; + +import java.util.List; + +/** + * Chain together several {@link org.apache.mahout.utils.regex.RegexTransformer} and apply them to the match + * in succession + */ +public class ChainTransformer implements RegexTransformer { + + private List<RegexTransformer> chain = Lists.newArrayList(); + + public ChainTransformer() { + } + + public ChainTransformer(List<RegexTransformer> chain) { + this.chain = chain; + } + + @Override + public String transformMatch(String match) { + String result = match; + for (RegexTransformer transformer : chain) { + result = transformer.transformMatch(result); + } + return result; + } + + public List<RegexTransformer> getChain() { + return chain; + } + + public void setChain(List<RegexTransformer> chain) { + this.chain = chain; + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/FPGFormatter.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/FPGFormatter.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/FPGFormatter.java new file mode 100644 index 0000000..a0f296d --- /dev/null +++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/FPGFormatter.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.utils.regex; + +import java.util.regex.Pattern; + +/** + * Collapses/converts all whitespace to a single tab + */ +public class FPGFormatter implements RegexFormatter { + + private static final Pattern WHITESPACE = Pattern.compile("\\W+"); + + @Override + public String format(String toFormat) { + return '\t' + WHITESPACE.matcher(toFormat).replaceAll("|"); + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/IdentityFormatter.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/IdentityFormatter.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/IdentityFormatter.java new file mode 100644 index 0000000..5c1177c --- /dev/null +++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/IdentityFormatter.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.utils.regex; + +public class IdentityFormatter implements RegexFormatter { + + @Override + public String format(String toFormat) { + return toFormat; + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/IdentityTransformer.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/IdentityTransformer.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/IdentityTransformer.java new file mode 100644 index 0000000..aea695d --- /dev/null +++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/IdentityTransformer.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.utils.regex; + +/** + * No-op + */ +public final class IdentityTransformer implements RegexTransformer { + + @Override + public String transformMatch(String match) { + return match; + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/RegexConverterDriver.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/RegexConverterDriver.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/RegexConverterDriver.java new file mode 100644 index 0000000..53be239 --- /dev/null +++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/RegexConverterDriver.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.utils.regex; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.util.ToolRunner; +import org.apache.lucene.analysis.Analyzer; +import org.apache.mahout.common.AbstractJob; +import org.apache.mahout.common.HadoopUtil; +import org.apache.mahout.common.commandline.DefaultOptionCreator; + +/** + * Experimental + */ +public class RegexConverterDriver extends AbstractJob { + + @Override + public int run(String[] args) throws Exception { + addInputOption(); + addOutputOption(); + addOption(DefaultOptionCreator.overwriteOption().create()); + addOption("regex", "regex", + "The regular expression to use", true); + addOption("groupsToKeep", "g", + "The number of the capturing groups to keep", false); + addOption("transformerClass", "t", + "The optional class specifying the Regex Transformer", false); + addOption("formatterClass", "t", + "The optional class specifying the Regex Formatter", false); + addOption(DefaultOptionCreator.analyzerOption().create()); + + if (parseArguments(args) == null) { + return -1; + } + + Configuration conf = getConf(); + //TODO: How to deal with command line escaping? + conf.set(RegexMapper.REGEX, getOption("regex")); // + String gtk = getOption("groupsToKeep"); + if (gtk != null) { + conf.set(RegexMapper.GROUP_MATCHERS, gtk); + } + String trans = getOption("transformerClass"); + if (trans != null) { + if ("url".equalsIgnoreCase(trans)) { + trans = URLDecodeTransformer.class.getName(); + } + conf.set(RegexMapper.TRANSFORMER_CLASS, trans); + } + String formatter = getOption("formatterClass"); + if (formatter != null) { + if ("fpg".equalsIgnoreCase(formatter)) { + formatter = FPGFormatter.class.getName(); + } + conf.set(RegexMapper.FORMATTER_CLASS, formatter); + } + Path input = getInputPath(); + Path output = getOutputPath(); + if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) { + HadoopUtil.delete(getConf(), output); + } + Class<? extends Analyzer> analyzerClass = getAnalyzerClassFromOption(); + if (analyzerClass != null) { + conf.set(RegexMapper.ANALYZER_NAME, analyzerClass.getName()); + } + Job job = prepareJob(input, output, + TextInputFormat.class, + RegexMapper.class, + LongWritable.class, + Text.class, + TextOutputFormat.class); + boolean succeeded = job.waitForCompletion(true); + return succeeded ? 0 : -1; + } + + public static void main(String[] args) throws Exception { + ToolRunner.run(new RegexConverterDriver(), args); + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/RegexFormatter.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/RegexFormatter.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/RegexFormatter.java new file mode 100644 index 0000000..8ef837b --- /dev/null +++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/RegexFormatter.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.utils.regex; + +public interface RegexFormatter { + + String format(String toFormat); + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/RegexMapper.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/RegexMapper.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/RegexMapper.java new file mode 100644 index 0000000..04cacaa --- /dev/null +++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/RegexMapper.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.utils.regex; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Pattern; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.lucene.analysis.Analyzer; +import org.apache.mahout.common.ClassUtils; + +public class RegexMapper extends Mapper<LongWritable, Text, LongWritable, Text> { + + public static final String REGEX = "regex"; + public static final String GROUP_MATCHERS = "regex.groups"; + public static final String TRANSFORMER_CLASS = "transformer.class"; + public static final String FORMATTER_CLASS = "formatter.class"; + + private Pattern regex; + private List<Integer> groupsToKeep; + private RegexTransformer transformer = RegexUtils.IDENTITY_TRANSFORMER; + private RegexFormatter formatter = RegexUtils.IDENTITY_FORMATTER; + public static final String ANALYZER_NAME = "analyzerName"; + + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + groupsToKeep = new ArrayList<>(); + Configuration config = context.getConfiguration(); + String regexStr = config.get(REGEX); + regex = Pattern.compile(regexStr); + String[] groups = config.getStrings(GROUP_MATCHERS); + if (groups != null) { + for (String group : groups) { + groupsToKeep.add(Integer.parseInt(group)); + } + } + + transformer = ClassUtils.instantiateAs(config.get(TRANSFORMER_CLASS, IdentityTransformer.class.getName()), + RegexTransformer.class); + String analyzerName = config.get(ANALYZER_NAME); + if (analyzerName != null && transformer instanceof AnalyzerTransformer) { + Analyzer analyzer = ClassUtils.instantiateAs(analyzerName, Analyzer.class); + ((AnalyzerTransformer)transformer).setAnalyzer(analyzer); + } + + formatter = ClassUtils.instantiateAs(config.get(FORMATTER_CLASS, IdentityFormatter.class.getName()), + RegexFormatter.class); + } + + + @Override + protected void map(LongWritable key, Text text, Context context) throws IOException, InterruptedException { + String result = RegexUtils.extract(text.toString(), regex, groupsToKeep, " ", transformer); + if (!result.isEmpty()) { + String format = formatter.format(result); + context.write(key, new Text(format)); + } + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/RegexTransformer.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/RegexTransformer.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/RegexTransformer.java new file mode 100644 index 0000000..adbc98f --- /dev/null +++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/RegexTransformer.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.utils.regex; + +/** + * Transforms the match of a regular expression. + */ +public interface RegexTransformer { + + String transformMatch(String match); + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/RegexUtils.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/RegexUtils.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/RegexUtils.java new file mode 100644 index 0000000..5e32b99 --- /dev/null +++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/RegexUtils.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.utils.regex; + +import java.util.Collection; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public final class RegexUtils { + + public static final RegexTransformer IDENTITY_TRANSFORMER = new IdentityTransformer(); + public static final RegexFormatter IDENTITY_FORMATTER = new IdentityFormatter(); + + private RegexUtils() { + } + + public static String extract(CharSequence line, Pattern pattern, Collection<Integer> groupsToKeep, + String separator, RegexTransformer transformer) { + StringBuilder bldr = new StringBuilder(); + extract(line, bldr, pattern, groupsToKeep, separator, transformer); + return bldr.toString(); + } + + public static void extract(CharSequence line, StringBuilder outputBuffer, + Pattern pattern, Collection<Integer> groupsToKeep, String separator, + RegexTransformer transformer) { + if (transformer == null) { + transformer = IDENTITY_TRANSFORMER; + } + Matcher matcher = pattern.matcher(line); + String match; + if (groupsToKeep.isEmpty()) { + while (matcher.find()) { + match = matcher.group(); + if (match != null) { + outputBuffer.append(transformer.transformMatch(match)).append(separator); + } + } + } else { + while (matcher.find()) { + for (Integer groupNum : groupsToKeep) { + match = matcher.group(groupNum); + if (match != null) { + outputBuffer.append(transformer.transformMatch(match)).append(separator); + } + } + } + } + //trim off the last separator, which is always there + if (outputBuffer.length() > 0) { + outputBuffer.setLength(outputBuffer.length() - separator.length()); + } + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/URLDecodeTransformer.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/URLDecodeTransformer.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/URLDecodeTransformer.java new file mode 100644 index 0000000..3eb7fc0 --- /dev/null +++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/URLDecodeTransformer.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.utils.regex; + +import java.io.UnsupportedEncodingException; +import java.net.URLDecoder; + +public final class URLDecodeTransformer implements RegexTransformer { + + private final String enc; + + public URLDecodeTransformer() { + enc = "UTF-8"; + } + + public URLDecodeTransformer(String encoding) { + this.enc = encoding; + } + + @Override + public String transformMatch(String match) { + try { + return URLDecoder.decode(match, enc); + } catch (UnsupportedEncodingException e) { + throw new IllegalStateException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/RowIdJob.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/RowIdJob.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/RowIdJob.java new file mode 100644 index 0000000..13d61b8 --- /dev/null +++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/RowIdJob.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.utils.vectors; + +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.util.ToolRunner; +import org.apache.mahout.common.AbstractJob; +import org.apache.mahout.common.Pair; +import org.apache.mahout.common.iterator.sequencefile.PathFilters; +import org.apache.mahout.common.iterator.sequencefile.PathType; +import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable; +import org.apache.mahout.math.VectorWritable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Converts a vector representation of documents into a {@code document x terms} matrix. + * The input data is in {@code SequenceFile<Text,VectorWritable>} format (as generated by + * {@link org.apache.mahout.vectorizer.SparseVectorsFromSequenceFiles SparseVectorsFromSequenceFiles} + * or by {@link org.apache.mahout.vectorizer.EncodedVectorsFromSequenceFiles EncodedVectorsFromSequenceFiles}) + * and generates the following two files as output: + * <ul><li>A file called "matrix" of format {@code SequenceFile<IntWritable,VectorWritable>}.</li> + * <li>A file called "docIndex" of format {@code SequenceFile<IntWritable,Text>}.</li></ul> + * The input file can be regenerated by joining the two output files on the generated int key. + * In other words, {@code RowIdJob} replaces the document text ids by integers. + * The original document text ids can still be retrieved from the "docIndex". + */ +public class RowIdJob extends AbstractJob { + private static final Logger log = LoggerFactory.getLogger(RowIdJob.class); + + @Override + public int run(String[] args) throws Exception { + + addInputOption(); + addOutputOption(); + + Map<String, List<String>> parsedArgs = parseArguments(args); + if (parsedArgs == null) { + return -1; + } + + Configuration conf = getConf(); + FileSystem fs = FileSystem.get(conf); + + Path outputPath = getOutputPath(); + Path indexPath = new Path(outputPath, "docIndex"); + Path matrixPath = new Path(outputPath, "matrix"); + + try (SequenceFile.Writer indexWriter = SequenceFile.createWriter(fs, conf, indexPath, + IntWritable.class, Text.class); + SequenceFile.Writer matrixWriter = SequenceFile.createWriter(fs, conf, matrixPath, IntWritable.class, + VectorWritable.class)) { + IntWritable docId = new IntWritable(); + int i = 0; + int numCols = 0; + for (Pair<Text, VectorWritable> record + : new SequenceFileDirIterable<Text, VectorWritable>(getInputPath(), PathType.LIST, PathFilters.logsCRCFilter(), + null, true, conf)) { + VectorWritable value = record.getSecond(); + docId.set(i); + indexWriter.append(docId, record.getFirst()); + matrixWriter.append(docId, value); + i++; + numCols = value.get().size(); + } + + log.info("Wrote out matrix with {} rows and {} columns to {}", i, numCols, matrixPath); + return 0; + } + } + + public static void main(String[] args) throws Exception { + ToolRunner.run(new RowIdJob(), args); + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/TermEntry.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/TermEntry.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/TermEntry.java new file mode 100644 index 0000000..d74803f --- /dev/null +++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/TermEntry.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.utils.vectors; + +/** + * Each entry in a {@link TermInfo} dictionary. Contains information about a term. + */ +public class TermEntry { + + private final String term; + private final int termIdx; + private final int docFreq; + + public TermEntry(String term, int termIdx, int docFreq) { + this.term = term; + this.termIdx = termIdx; + this.docFreq = docFreq; + } + + public String getTerm() { + return term; + } + + public int getTermIdx() { + return termIdx; + } + + public int getDocFreq() { + return docFreq; + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/TermInfo.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/TermInfo.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/TermInfo.java new file mode 100644 index 0000000..4fb36a3 --- /dev/null +++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/TermInfo.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.utils.vectors; + +import java.util.Iterator; + +/** + * Contains the term dictionary information associated with a vectorized collection of text documents + * + */ +public interface TermInfo { + + int totalTerms(String field); + + TermEntry getTermEntry(String field, String term); + + Iterator<TermEntry> getAllEntries(); +} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/VectorDumper.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/VectorDumper.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/VectorDumper.java new file mode 100644 index 0000000..e1c3fbc --- /dev/null +++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/VectorDumper.java @@ -0,0 +1,266 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.utils.vectors; + +import com.google.common.collect.Sets; +import com.google.common.io.Closeables; +import com.google.common.io.Files; +import org.apache.commons.io.Charsets; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.util.ToolRunner; +import org.apache.mahout.clustering.classify.WeightedPropertyVectorWritable; +import org.apache.mahout.common.AbstractJob; +import org.apache.mahout.common.Pair; +import org.apache.mahout.common.iterator.sequencefile.PathFilters; +import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable; +import org.apache.mahout.math.NamedVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.util.Iterator; +import java.util.Set; + +/** + * Can read in a {@link org.apache.hadoop.io.SequenceFile} of {@link Vector}s and dump + * out the results using {@link Vector#asFormatString()} to either the console or to a + * file. + */ +public final class VectorDumper extends AbstractJob { + + private static final Logger log = LoggerFactory.getLogger(VectorDumper.class); + + private VectorDumper() { + } + + @Override + public int run(String[] args) throws Exception { + /** + Option seqOpt = obuilder.withLongName("seqFile").withRequired(false).withArgument( + abuilder.withName("seqFile").withMinimum(1).withMaximum(1).create()).withDescription( + "The Sequence File containing the Vectors").withShortName("s").create(); + Option dirOpt = obuilder.withLongName("seqDirectory").withRequired(false).withArgument( + abuilder.withName("seqDirectory").withMinimum(1).withMaximum(1).create()) + .withDescription("The directory containing Sequence File of Vectors") + .withShortName("d").create(); + */ + addInputOption(); + addOutputOption(); + addOption("useKey", "u", "If the Key is a vector than dump that instead"); + addOption("printKey", "p", "Print out the key as well, delimited by tab (or the value if useKey is true"); + addOption("dictionary", "d", "The dictionary file.", false); + addOption("dictionaryType", "dt", "The dictionary file type (text|seqfile)", false); + addOption("csv", "c", "Output the Vector as CSV. Otherwise it substitutes in the terms for vector cell entries"); + addOption("namesAsComments", "n", "If using CSV output, optionally add a comment line for each NamedVector " + + "(if the vector is one) printing out the name"); + addOption("nameOnly", "N", "Use the name as the value for each NamedVector (skip other vectors)"); + addOption("sortVectors", "sort", "Sort output key/value pairs of the vector entries in abs magnitude " + + "descending order"); + addOption("quiet", "q", "Print only file contents"); + addOption("sizeOnly", "sz", "Dump only the size of the vector"); + addOption("numItems", "ni", "Output at most <n> vecors", false); + addOption("vectorSize", "vs", "Truncate vectors to <vs> length when dumping (most useful when in" + + " conjunction with -sort", false); + addOption(buildOption("filter", "fi", "Only dump out those vectors whose name matches the filter." + + " Multiple items may be specified by repeating the argument.", true, 1, Integer.MAX_VALUE, false, null)); + + if (parseArguments(args, false, true) == null) { + return -1; + } + + Path[] pathArr; + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.get(conf); + Path input = getInputPath(); + FileStatus fileStatus = fs.getFileStatus(input); + if (fileStatus.isDir()) { + pathArr = FileUtil.stat2Paths(fs.listStatus(input, PathFilters.logsCRCFilter())); + } else { + FileStatus[] inputPaths = fs.globStatus(input); + pathArr = new Path[inputPaths.length]; + int i = 0; + for (FileStatus fstatus : inputPaths) { + pathArr[i++] = fstatus.getPath(); + } + } + + + String dictionaryType = getOption("dictionaryType", "text"); + + boolean sortVectors = hasOption("sortVectors"); + boolean quiet = hasOption("quiet"); + if (!quiet) { + log.info("Sort? {}", sortVectors); + } + + String[] dictionary = null; + if (hasOption("dictionary")) { + String dictFile = getOption("dictionary"); + switch (dictionaryType) { + case "text": + dictionary = VectorHelper.loadTermDictionary(new File(dictFile)); + break; + case "sequencefile": + dictionary = VectorHelper.loadTermDictionary(conf, dictFile); + break; + default: + //TODO: support Lucene's FST as a dictionary type + throw new IOException("Invalid dictionary type: " + dictionaryType); + } + } + + Set<String> filters; + if (hasOption("filter")) { + filters = Sets.newHashSet(getOptions("filter")); + } else { + filters = null; + } + + boolean useCSV = hasOption("csv"); + + boolean sizeOnly = hasOption("sizeOnly"); + boolean nameOnly = hasOption("nameOnly"); + boolean namesAsComments = hasOption("namesAsComments"); + boolean transposeKeyValue = hasOption("vectorAsKey"); + Writer writer; + boolean shouldClose; + File output = getOutputFile(); + if (output != null) { + shouldClose = true; + log.info("Output file: {}", output); + Files.createParentDirs(output); + writer = Files.newWriter(output, Charsets.UTF_8); + } else { + shouldClose = false; + writer = new OutputStreamWriter(System.out, Charsets.UTF_8); + } + try { + boolean printKey = hasOption("printKey"); + if (useCSV && dictionary != null) { + writer.write("#"); + for (int j = 0; j < dictionary.length; j++) { + writer.write(dictionary[j]); + if (j < dictionary.length - 1) { + writer.write(','); + } + } + writer.write('\n'); + } + Long numItems = null; + if (hasOption("numItems")) { + numItems = Long.parseLong(getOption("numItems")); + if (quiet) { + writer.append("#Max Items to dump: ").append(String.valueOf(numItems)).append('\n'); + } + } + int maxIndexesPerVector = hasOption("vectorSize") + ? Integer.parseInt(getOption("vectorSize")) + : Integer.MAX_VALUE; + long itemCount = 0; + int fileCount = 0; + for (Path path : pathArr) { + if (numItems != null && numItems <= itemCount) { + break; + } + if (quiet) { + log.info("Processing file '{}' ({}/{})", path, ++fileCount, pathArr.length); + } + SequenceFileIterable<Writable, Writable> iterable = new SequenceFileIterable<>(path, true, conf); + Iterator<Pair<Writable, Writable>> iterator = iterable.iterator(); + long i = 0; + while (iterator.hasNext() && (numItems == null || itemCount < numItems)) { + Pair<Writable, Writable> record = iterator.next(); + Writable keyWritable = record.getFirst(); + Writable valueWritable = record.getSecond(); + if (printKey) { + Writable notTheVectorWritable = transposeKeyValue ? valueWritable : keyWritable; + writer.write(notTheVectorWritable.toString()); + writer.write('\t'); + } + Vector vector; + try { + vector = ((VectorWritable) + (transposeKeyValue ? keyWritable : valueWritable)).get(); + } catch (ClassCastException e) { + if ((transposeKeyValue ? keyWritable : valueWritable) + instanceof WeightedPropertyVectorWritable) { + vector = + ((WeightedPropertyVectorWritable) + (transposeKeyValue ? keyWritable : valueWritable)).getVector(); + } else { + throw e; + } + } + if (filters == null + || !(vector instanceof NamedVector) + || filters.contains(((NamedVector) vector).getName())) { + if (sizeOnly) { + if (vector instanceof NamedVector) { + writer.write(((NamedVector) vector).getName()); + writer.write(":"); + } else { + writer.write(String.valueOf(i++)); + writer.write(":"); + } + writer.write(String.valueOf(vector.size())); + writer.write('\n'); + } else if (nameOnly) { + if (vector instanceof NamedVector) { + writer.write(((NamedVector) vector).getName()); + writer.write('\n'); + } + } else { + String fmtStr; + if (useCSV) { + fmtStr = VectorHelper.vectorToCSVString(vector, namesAsComments); + } else { + fmtStr = VectorHelper.vectorToJson(vector, dictionary, maxIndexesPerVector, + sortVectors); + } + writer.write(fmtStr); + writer.write('\n'); + } + itemCount++; + } + } + } + writer.flush(); + } finally { + if (shouldClose) { + Closeables.close(writer, false); + } + } + + return 0; + } + + public static void main(String[] args) throws Exception { + ToolRunner.run(new Configuration(), new VectorDumper(), args); + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/VectorHelper.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/VectorHelper.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/VectorHelper.java new file mode 100644 index 0000000..66c3fb6 --- /dev/null +++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/VectorHelper.java @@ -0,0 +1,256 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.utils.vectors; + +import com.google.common.base.Function; +import com.google.common.collect.Collections2; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.lucene.util.PriorityQueue; +import org.apache.mahout.common.Pair; +import org.apache.mahout.common.iterator.FileLineIterator; +import org.apache.mahout.common.iterator.sequencefile.PathType; +import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable; +import org.apache.mahout.math.NamedVector; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.Vector.Element; +import org.apache.mahout.math.map.OpenObjectIntHashMap; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.regex.Pattern; + +/** Static utility methods related to vectors. */ +public final class VectorHelper { + + private static final Pattern TAB_PATTERN = Pattern.compile("\t"); + + + private VectorHelper() { + } + + public static String vectorToCSVString(Vector vector, boolean namesAsComments) throws IOException { + Appendable bldr = new StringBuilder(2048); + vectorToCSVString(vector, namesAsComments, bldr); + return bldr.toString(); + } + + public static String buildJson(Iterable<Pair<String, Double>> iterable) { + return buildJson(iterable, new StringBuilder(2048)); + } + + public static String buildJson(Iterable<Pair<String, Double>> iterable, StringBuilder bldr) { + bldr.append('{'); + for (Pair<String, Double> p : iterable) { + bldr.append(p.getFirst()); + bldr.append(':'); + bldr.append(p.getSecond()); + bldr.append(','); + } + if (bldr.length() > 1) { + bldr.setCharAt(bldr.length() - 1, '}'); + } + return bldr.toString(); + } + + public static List<Pair<Integer, Double>> topEntries(Vector vector, int maxEntries) { + + // Get the size of nonZero elements in the input vector + int sizeOfNonZeroElementsInVector = vector.getNumNonZeroElements(); + + // If the sizeOfNonZeroElementsInVector < maxEntries then set maxEntries = sizeOfNonZeroElementsInVector + // otherwise the call to queue.pop() returns a Pair(null, null) and the subsequent call + // to pair.getFirst() throws a NullPointerException + if (sizeOfNonZeroElementsInVector < maxEntries) { + maxEntries = sizeOfNonZeroElementsInVector; + } + + PriorityQueue<Pair<Integer, Double>> queue = new TDoublePQ<>(-1, maxEntries); + for (Element e : vector.nonZeroes()) { + queue.insertWithOverflow(Pair.of(e.index(), e.get())); + } + List<Pair<Integer, Double>> entries = new ArrayList<>(); + Pair<Integer, Double> pair; + while ((pair = queue.pop()) != null) { + if (pair.getFirst() > -1) { + entries.add(pair); + } + } + Collections.sort(entries, new Comparator<Pair<Integer, Double>>() { + @Override + public int compare(Pair<Integer, Double> a, Pair<Integer, Double> b) { + return b.getSecond().compareTo(a.getSecond()); + } + }); + return entries; + } + + public static List<Pair<Integer, Double>> firstEntries(Vector vector, int maxEntries) { + List<Pair<Integer, Double>> entries = new ArrayList<>(); + Iterator<Vector.Element> it = vector.nonZeroes().iterator(); + int i = 0; + while (it.hasNext() && i++ < maxEntries) { + Vector.Element e = it.next(); + entries.add(Pair.of(e.index(), e.get())); + } + return entries; + } + + public static List<Pair<String, Double>> toWeightedTerms(Collection<Pair<Integer, Double>> entries, + final String[] dictionary) { + if (dictionary != null) { + return new ArrayList<>(Collections2.transform(entries, + new Function<Pair<Integer, Double>, Pair<String, Double>>() { + @Override + public Pair<String, Double> apply(Pair<Integer, Double> p) { + return Pair.of(dictionary[p.getFirst()], p.getSecond()); + } + })); + } else { + return new ArrayList<>(Collections2.transform(entries, + new Function<Pair<Integer, Double>, Pair<String, Double>>() { + @Override + public Pair<String, Double> apply(Pair<Integer, Double> p) { + return Pair.of(Integer.toString(p.getFirst()), p.getSecond()); + } + })); + } + } + + public static String vectorToJson(Vector vector, String[] dictionary, int maxEntries, boolean sort) { + return buildJson(toWeightedTerms(sort + ? topEntries(vector, maxEntries) + : firstEntries(vector, maxEntries), dictionary)); + } + + public static void vectorToCSVString(Vector vector, + boolean namesAsComments, + Appendable bldr) throws IOException { + if (namesAsComments && vector instanceof NamedVector) { + bldr.append('#').append(((NamedVector) vector).getName()).append('\n'); + } + Iterator<Vector.Element> iter = vector.all().iterator(); + boolean first = true; + while (iter.hasNext()) { + if (first) { + first = false; + } else { + bldr.append(','); + } + Vector.Element elt = iter.next(); + bldr.append(String.valueOf(elt.get())); + } + bldr.append('\n'); + } + + /** + * Read in a dictionary file. Format is: + * <p/> + * <pre> + * term DocFreq Index + * </pre> + */ + public static String[] loadTermDictionary(File dictFile) throws IOException { + try (InputStream in = new FileInputStream(dictFile)) { + return loadTermDictionary(in); + } + } + + /** + * Read a dictionary in {@link org.apache.hadoop.io.SequenceFile} generated by + * {@link org.apache.mahout.vectorizer.DictionaryVectorizer} + * + * @param filePattern <PATH TO DICTIONARY>/dictionary.file-* + */ + public static String[] loadTermDictionary(Configuration conf, String filePattern) { + OpenObjectIntHashMap<String> dict = new OpenObjectIntHashMap<>(); + int maxIndexValue = 0; + for (Pair<Text, IntWritable> record + : new SequenceFileDirIterable<Text, IntWritable>(new Path(filePattern), PathType.GLOB, null, null, true, + conf)) { + dict.put(record.getFirst().toString(), record.getSecond().get()); + if (record.getSecond().get() > maxIndexValue) { + maxIndexValue = record.getSecond().get(); + } + } + // Set dictionary size to greater of (maxIndexValue + 1, dict.size()) + int maxDictionarySize = maxIndexValue + 1 > dict.size() ? maxIndexValue + 1 : dict.size(); + String[] dictionary = new String[maxDictionarySize]; + for (String feature : dict.keys()) { + dictionary[dict.get(feature)] = feature; + } + return dictionary; + } + + /** + * Read in a dictionary file. Format is: First line is the number of entries + * <p/> + * <pre> + * term DocFreq Index + * </pre> + */ + private static String[] loadTermDictionary(InputStream is) throws IOException { + FileLineIterator it = new FileLineIterator(is); + + int numEntries = Integer.parseInt(it.next()); + String[] result = new String[numEntries]; + + while (it.hasNext()) { + String line = it.next(); + if (line.startsWith("#")) { + continue; + } + String[] tokens = TAB_PATTERN.split(line); + if (tokens.length < 3) { + continue; + } + int index = Integer.parseInt(tokens[2]); // tokens[1] is the doc freq + result[index] = tokens[0]; + } + return result; + } + + private static final class TDoublePQ<T> extends PriorityQueue<Pair<T, Double>> { + private final T sentinel; + + private TDoublePQ(T sentinel, int size) { + super(size); + this.sentinel = sentinel; + } + + @Override + protected boolean lessThan(Pair<T, Double> a, Pair<T, Double> b) { + return a.getSecond().compareTo(b.getSecond()) < 0; + } + + @Override + protected Pair<T, Double> getSentinelObject() { + return Pair.of(sentinel, Double.NEGATIVE_INFINITY); + } + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/arff/ARFFIterator.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/arff/ARFFIterator.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/arff/ARFFIterator.java new file mode 100644 index 0000000..f2632a4 --- /dev/null +++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/arff/ARFFIterator.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.utils.vectors.arff; + +import java.io.BufferedReader; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import com.google.common.collect.AbstractIterator; +import com.google.common.io.Closeables; +import org.apache.mahout.math.DenseVector; +import org.apache.mahout.math.RandomAccessSparseVector; +import org.apache.mahout.math.Vector; + +final class ARFFIterator extends AbstractIterator<Vector> { + + // This pattern will make sure a , inside a string is not a point for split. + // Ex: "Arizona" , "0:08 PM, PDT" , 110 will be split considering "0:08 PM, PDT" as one string + private static final Pattern WORDS_WITHOUT_SPARSE = Pattern.compile("([\\w[^{]])*"); + private static final Pattern DATA_PATTERN = Pattern.compile("^\\"+ARFFModel.ARFF_SPARSE+"(.*)\\"+ARFFModel.ARFF_SPARSE_END+"$"); + + private final BufferedReader reader; + private final ARFFModel model; + + ARFFIterator(BufferedReader reader, ARFFModel model) { + this.reader = reader; + this.model = model; + } + + @Override + protected Vector computeNext() { + String line; + try { + while ((line = reader.readLine()) != null) { + line = line.trim(); + if (!line.isEmpty() && !line.startsWith(ARFFModel.ARFF_COMMENT)) { + break; + } + } + } catch (IOException ioe) { + throw new IllegalStateException(ioe); + } + if (line == null) { + try { + Closeables.close(reader, true); + } catch (IOException e) { + throw new IllegalStateException(e); + } + return endOfData(); + } + Vector result; + Matcher contents = DATA_PATTERN.matcher(line); + if (contents.find()) { + line = contents.group(1); + String[] splits = splitCSV(line); + result = new RandomAccessSparseVector(model.getLabelSize()); + for (String split : splits) { + int idIndex = split.indexOf(' '); + int idx = Integer.parseInt(split.substring(0, idIndex).trim()); + String data = split.substring(idIndex).trim(); + if (!"?".equals(data)) { + result.setQuick(idx, model.getValue(data, idx)); + } + } + } else { + result = new DenseVector(model.getLabelSize()); + String[] splits = splitCSV(line); + for (int i = 0; i < splits.length; i++) { + String split = splits[i]; + split = split.trim(); + if (WORDS_WITHOUT_SPARSE.matcher(split).matches() && !"?".equals(split)) { + result.setQuick(i, model.getValue(split, i)); + } + } + } + return result; + } + + /** + * Splits a string by comma, ignores commas inside quotes and escaped quotes. + * As quotes are both double and single possible, because there is no exact definition + * for ARFF files + * @param line - + * @return String[] + */ + public static String[] splitCSV(String line) { + StringBuilder sb = new StringBuilder(128); + List<String> tokens = new ArrayList<>(); + char escapeChar = '\0'; + for (int i = 0; i < line.length(); i++) { + char c = line.charAt(i); + if (c == '\\') { + i++; + sb.append(line.charAt(i)); + } + else if (c == '"' || c == '\'') { + // token is closed + if (c == escapeChar) { + escapeChar = '\0'; + } + else if (escapeChar == '\0') { + escapeChar = c; + } + sb.append(c); + } + else if (c == ',') { + if (escapeChar == '\0') { + tokens.add(sb.toString().trim()); + sb.setLength(0); // start work on next token + } + else { + sb.append(c); + } + } + else { + sb.append(c); + } + } + if (sb.length() > 0) { + tokens.add(sb.toString().trim()); + } + + return tokens.toArray(new String[tokens.size()]); + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/arff/ARFFModel.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/arff/ARFFModel.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/arff/ARFFModel.java new file mode 100644 index 0000000..fc86997 --- /dev/null +++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/arff/ARFFModel.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.utils.vectors.arff; + +import java.text.DateFormat; +import java.util.Map; + +/** + * An interface for representing an ARFFModel. Implementations can decide on the best approach + * for storing the model, as some approaches will be fine for smaller files, while larger + * ones may require a better implementation. + */ +public interface ARFFModel { + String ARFF_SPARSE = "{"; //indicates the vector is sparse + String ARFF_SPARSE_END = "}"; + String ARFF_COMMENT = "%"; + String ATTRIBUTE = "@attribute"; + String DATA = "@data"; + String RELATION = "@relation"; + + + String getRelation(); + + void setRelation(String relation); + + /** + * The vector attributes (labels in Mahout speak) + * @return the map + */ + Map<String, Integer> getLabelBindings(); + + Integer getNominalValue(String label, String nominal); + + void addNominal(String label, String nominal, int idx); + + DateFormat getDateFormat(Integer idx); + + void addDateFormat(Integer idx, DateFormat format); + + Integer getLabelIndex(String label); + + void addLabel(String label, Integer idx); + + ARFFType getARFFType(Integer idx); + + void addType(Integer idx, ARFFType type); + + /** + * The count of the number of words seen + * @return the count + */ + long getWordCount(); + + double getValue(String data, int idx); + + Map<String, Map<String, Integer>> getNominalMap(); + + int getLabelSize(); + + Map<String, Long> getWords(); +} http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/arff/ARFFType.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/arff/ARFFType.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/arff/ARFFType.java new file mode 100644 index 0000000..9ba7c31 --- /dev/null +++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/arff/ARFFType.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.utils.vectors.arff; + +public enum ARFFType { + + NUMERIC("numeric"), + INTEGER("integer"), + REAL("real"), + NOMINAL("{"), + DATE("date"), + STRING("string"); + + private final String indicator; + + ARFFType(String indicator) { + this.indicator = indicator; + } + + public String getIndicator() { + return indicator; + } + + public String getLabel(String line) { + int idx = line.lastIndexOf(indicator); + return removeQuotes(line.substring(ARFFModel.ATTRIBUTE.length(), idx)); + } + + /** + * Remove quotes and leading/trailing whitespace from a single or double quoted string + * @param str quotes from + * @return A string without quotes + */ + public static String removeQuotes(String str) { + String cleaned = str; + if (cleaned != null) { + cleaned = cleaned.trim(); + boolean isQuoted = cleaned.length() > 1 + && (cleaned.startsWith("\"") && cleaned.endsWith("\"") + || cleaned.startsWith("'") && cleaned.endsWith("'")); + if (isQuoted) { + cleaned = cleaned.substring(1, cleaned.length() - 1); + } + } + return cleaned; + } +}
