http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/tools/DmozParser.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/tools/DmozParser.java b/nutch-core/src/main/java/org/apache/nutch/tools/DmozParser.java new file mode 100644 index 0000000..54ec543 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/tools/DmozParser.java @@ -0,0 +1,391 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.tools; + +import java.io.*; +import java.util.*; +import java.util.regex.*; + +import javax.xml.parsers.*; +import org.xml.sax.*; +import org.xml.sax.helpers.*; +import org.apache.xerces.util.XMLChar; + +// Slf4j Logging imports +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.io.*; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.conf.Configuration; +import org.apache.nutch.util.NutchConfiguration; + +/** Utility that converts DMOZ RDF into a flat file of URLs to be injected. */ +public class DmozParser { + public static final Logger LOG = LoggerFactory.getLogger(DmozParser.class); + + long pages = 0; + + /** + * This filter fixes characters that might offend our parser. This lets us be + * tolerant of errors that might appear in the input XML. + */ + private static class XMLCharFilter extends FilterReader { + private boolean lastBad = false; + + public XMLCharFilter(Reader reader) { + super(reader); + } + + public int read() throws IOException { + int c = in.read(); + int value = c; + if (c != -1 && !(XMLChar.isValid(c))) // fix invalid characters + value = 'X'; + else if (lastBad && c == '<') { // fix mis-matched brackets + in.mark(1); + if (in.read() != '/') + value = 'X'; + in.reset(); + } + lastBad = (c == 65533); + + return value; + } + + public int read(char[] cbuf, int off, int len) throws IOException { + int n = in.read(cbuf, off, len); + if (n != -1) { + for (int i = 0; i < n; i++) { + char c = cbuf[off + i]; + char value = c; + if (!(XMLChar.isValid(c))) // fix invalid characters + value = 'X'; + else if (lastBad && c == '<') { // fix mis-matched brackets + if (i != n - 1 && cbuf[off + i + 1] != '/') + value = 'X'; + } + lastBad = (c == 65533); + cbuf[off + i] = value; + } + } + return n; + } + } + + /** + * The RDFProcessor receives tag messages during a parse of RDF XML data. We + * build whatever structures we need from these messages. + */ + private class RDFProcessor extends DefaultHandler { + String curURL = null, curSection = null; + boolean titlePending = false, descPending = false, + insideAdultSection = false; + Pattern topicPattern = null; + StringBuffer title = new StringBuffer(), desc = new StringBuffer(); + XMLReader reader; + int subsetDenom; + int hashSkew; + boolean includeAdult; + Locator location; + + /** + * Pass in an XMLReader, plus a flag as to whether we should include adult + * material. + */ + public RDFProcessor(XMLReader reader, int subsetDenom, + boolean includeAdult, int skew, Pattern topicPattern) + throws IOException { + this.reader = reader; + this.subsetDenom = subsetDenom; + this.includeAdult = includeAdult; + this.topicPattern = topicPattern; + + this.hashSkew = skew != 0 ? skew : new Random().nextInt(); + } + + // + // Interface ContentHandler + // + + /** + * Start of an XML elt + */ + public void startElement(String namespaceURI, String localName, + String qName, Attributes atts) throws SAXException { + if ("Topic".equals(qName)) { + curSection = atts.getValue("r:id"); + } else if ("ExternalPage".equals(qName)) { + // Porn filter + if ((!includeAdult) && curSection.startsWith("Top/Adult")) { + return; + } + + if (topicPattern != null && !topicPattern.matcher(curSection).matches()) { + return; + } + + // Subset denominator filter. + // Only emit with a chance of 1/denominator. + String url = atts.getValue("about"); + int hashValue = MD5Hash.digest(url).hashCode(); + hashValue = Math.abs(hashValue ^ hashSkew); + if ((hashValue % subsetDenom) != 0) { + return; + } + + // We actually claim the URL! + curURL = url; + } else if (curURL != null && "d:Title".equals(qName)) { + titlePending = true; + } else if (curURL != null && "d:Description".equals(qName)) { + descPending = true; + } + } + + /** + * The contents of an XML elt + */ + public void characters(char ch[], int start, int length) { + if (titlePending) { + title.append(ch, start, length); + } else if (descPending) { + desc.append(ch, start, length); + } + } + + /** + * Termination of XML elt + */ + public void endElement(String namespaceURI, String localName, String qName) + throws SAXException { + if (curURL != null) { + if ("ExternalPage".equals(qName)) { + // + // Inc the number of pages, insert the page, and + // possibly print status. + // + System.out.println(curURL); + pages++; + + // + // Clear out the link text. This is what + // you would use for adding to the linkdb. + // + if (title.length() > 0) { + title.delete(0, title.length()); + } + if (desc.length() > 0) { + desc.delete(0, desc.length()); + } + + // Null out the URL. + curURL = null; + } else if ("d:Title".equals(qName)) { + titlePending = false; + } else if ("d:Description".equals(qName)) { + descPending = false; + } + } + } + + /** + * When parsing begins + */ + public void startDocument() { + LOG.info("Begin parse"); + } + + /** + * When parsing ends + */ + public void endDocument() { + LOG.info("Completed parse. Found " + pages + " pages."); + } + + /** + * From time to time the Parser will set the "current location" by calling + * this function. It's useful for emitting locations for error messages. + */ + public void setDocumentLocator(Locator locator) { + location = locator; + } + + // + // Interface ErrorHandler + // + + /** + * Emit the exception message + */ + public void error(SAXParseException spe) { + if (LOG.isErrorEnabled()) { + LOG.error("Error: " + spe.toString() + ": " + spe.getMessage()); + } + } + + /** + * Emit the exception message, with line numbers + */ + public void errorError(SAXParseException spe) { + if (LOG.isErrorEnabled()) { + LOG.error("Fatal err: " + spe.toString() + ": " + spe.getMessage()); + LOG.error("Last known line is " + location.getLineNumber() + + ", column " + location.getColumnNumber()); + } + } + + /** + * Emit exception warning message + */ + public void warning(SAXParseException spe) { + if (LOG.isWarnEnabled()) { + LOG.warn("Warning: " + spe.toString() + ": " + spe.getMessage()); + } + } + } + + /** + * Iterate through all the items in this structured DMOZ file. Add each URL to + * the web db. + */ + public void parseDmozFile(File dmozFile, int subsetDenom, + boolean includeAdult, int skew, Pattern topicPattern) + + throws IOException, SAXException, ParserConfigurationException { + + SAXParserFactory parserFactory = SAXParserFactory.newInstance(); + SAXParser parser = parserFactory.newSAXParser(); + XMLReader reader = parser.getXMLReader(); + + // Create our own processor to receive SAX events + RDFProcessor rp = new RDFProcessor(reader, subsetDenom, includeAdult, skew, + topicPattern); + reader.setContentHandler(rp); + reader.setErrorHandler(rp); + LOG.info("skew = " + rp.hashSkew); + + // + // Open filtered text stream. The TextFilter makes sure that + // only appropriate XML-approved Text characters are received. + // Any non-conforming characters are silently skipped. + // + XMLCharFilter in = new XMLCharFilter(new BufferedReader( + new InputStreamReader(new BufferedInputStream(new FileInputStream( + dmozFile)), "UTF-8"))); + try { + InputSource is = new InputSource(in); + reader.parse(is); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error(e.toString()); + } + System.exit(0); + } finally { + in.close(); + } + } + + private static void addTopicsFromFile(String topicFile, Vector<String> topics) + throws IOException { + BufferedReader in = null; + try { + in = new BufferedReader(new InputStreamReader(new FileInputStream( + topicFile), "UTF-8")); + String line = null; + while ((line = in.readLine()) != null) { + topics.addElement(new String(line)); + } + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error(e.toString()); + } + System.exit(0); + } finally { + in.close(); + } + } + + /** + * Command-line access. User may add URLs via a flat text file or the + * structured DMOZ file. By default, we ignore Adult material (as categorized + * by DMOZ). + */ + public static void main(String argv[]) throws Exception { + if (argv.length < 1) { + System.err + .println("Usage: DmozParser <dmoz_file> [-subset <subsetDenominator>] [-includeAdultMaterial] [-skew skew] [-topicFile <topic list file>] [-topic <topic> [-topic <topic> [...]]]"); + return; + } + + // + // Parse the command line, figure out what kind of + // URL file we need to load + // + int subsetDenom = 1; + int skew = 0; + String dmozFile = argv[0]; + boolean includeAdult = false; + Pattern topicPattern = null; + Vector<String> topics = new Vector<String>(); + + Configuration conf = NutchConfiguration.create(); + FileSystem fs = FileSystem.get(conf); + try { + for (int i = 1; i < argv.length; i++) { + if ("-includeAdultMaterial".equals(argv[i])) { + includeAdult = true; + } else if ("-subset".equals(argv[i])) { + subsetDenom = Integer.parseInt(argv[i + 1]); + i++; + } else if ("-topic".equals(argv[i])) { + topics.addElement(argv[i + 1]); + i++; + } else if ("-topicFile".equals(argv[i])) { + addTopicsFromFile(argv[i + 1], topics); + i++; + } else if ("-skew".equals(argv[i])) { + skew = Integer.parseInt(argv[i + 1]); + i++; + } + } + + DmozParser parser = new DmozParser(); + + if (!topics.isEmpty()) { + String regExp = new String("^("); + int j = 0; + for (; j < topics.size() - 1; ++j) { + regExp = regExp.concat(topics.get(j)); + regExp = regExp.concat("|"); + } + regExp = regExp.concat(topics.get(j)); + regExp = regExp.concat(").*"); + LOG.info("Topic selection pattern = " + regExp); + topicPattern = Pattern.compile(regExp); + } + + parser.parseDmozFile(new File(dmozFile), subsetDenom, includeAdult, skew, + topicPattern); + + } finally { + fs.close(); + } + } + +}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/tools/FileDumper.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/tools/FileDumper.java b/nutch-core/src/main/java/org/apache/nutch/tools/FileDumper.java new file mode 100644 index 0000000..b7c1805 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/tools/FileDumper.java @@ -0,0 +1,419 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.tools; + +//JDK imports +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileFilter; +import java.io.FileOutputStream; +import java.io.ByteArrayInputStream; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import com.google.common.base.Strings; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +//Commons imports +import org.apache.commons.io.IOUtils; +import org.apache.commons.io.FilenameUtils; +import org.apache.commons.codec.digest.DigestUtils; + +//Hadoop +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.util.StringUtils; +import org.apache.nutch.protocol.Content; +import org.apache.nutch.util.DumpFileUtil; +import org.apache.nutch.util.NutchConfiguration; +import org.apache.nutch.util.TableUtil; + +//Tika imports +import org.apache.tika.Tika; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * <p> + * The file dumper tool enables one to reverse generate the raw content from + * Nutch segment data directories. + * </p> + * <p> + * The tool has a number of immediate uses: + * <ol> + * <li>one can see what a page looked like at the time it was crawled</li> + * <li>one can see different media types acquired as part of the crawl</li> + * <li>it enables us to see webpages before we augment them with additional + * metadata, this can be handy for providing a provenance trail for your crawl + * data.</li> + * </ol> + * </p> + * <p> + * Upon successful completion the tool displays a very convenient JSON snippet + * detailing the mimetype classifications and the counts of documents which fall + * into those classifications. An example is as follows: + * </p> + * + * <pre> + * {@code + * INFO: File Types: + * TOTAL Stats: + * [ + * {"mimeType":"application/xml","count":"19"} + * {"mimeType":"image/png","count":"47"} + * {"mimeType":"image/jpeg","count":"141"} + * {"mimeType":"image/vnd.microsoft.icon","count":"4"} + * {"mimeType":"text/plain","count":"89"} + * {"mimeType":"video/quicktime","count":"2"} + * {"mimeType":"image/gif","count":"63"} + * {"mimeType":"application/xhtml+xml","count":"1670"} + * {"mimeType":"application/octet-stream","count":"40"} + * {"mimeType":"text/html","count":"1863"} + * ] + * + * FILTER Stats: + * [ + * {"mimeType":"image/png","count":"47"} + * {"mimeType":"image/jpeg","count":"141"} + * {"mimeType":"image/vnd.microsoft.icon","count":"4"} + * {"mimeType":"video/quicktime","count":"2"} + * {"mimeType":"image/gif","count":"63"} + * ] + * } + * </pre> + * <p> + * In the case above, the tool would have been run with the <b>-mimeType + * image/png image/jpeg image/vnd.microsoft.icon video/quicktime image/gif</b> + * flag and corresponding values activated. + * + */ +public class FileDumper { + + private static final Logger LOG = LoggerFactory.getLogger(FileDumper.class + .getName()); + + /** + * Dumps the reverse engineered raw content from the provided segment + * directories if a parent directory contains more than one segment, otherwise + * a single segment can be passed as an argument. + * + * @param outputDir + * the directory you wish to dump the raw content to. This directory + * will be created. + * @param segmentRootDir + * a directory containing one or more segments. + * @param mimeTypes + * an array of mime types we have to dump, all others will be + * filtered out. + * @param flatDir + * a boolean flag specifying whether the output directory should contain + * only files instead of using nested directories to prevent naming + * conflicts. + * @param mimeTypeStats + * a flag indicating whether mimetype stats should be displayed + * instead of dumping files. + * @throws Exception + */ + public void dump(File outputDir, File segmentRootDir, String[] mimeTypes, boolean flatDir, boolean mimeTypeStats, boolean reverseURLDump) + throws Exception { + if (mimeTypes == null) + LOG.info("Accepting all mimetypes."); + // total file counts + Map<String, Integer> typeCounts = new HashMap<String, Integer>(); + // filtered file counts + Map<String, Integer> filteredCounts = new HashMap<String, Integer>(); + Configuration conf = NutchConfiguration.create(); + FileSystem fs = FileSystem.get(conf); + int fileCount = 0; + File[] segmentDirs = segmentRootDir.listFiles(new FileFilter() { + + @Override + public boolean accept(File file) { + return file.canRead() && file.isDirectory(); + } + }); + if (segmentDirs == null) { + LOG.error("No segment directories found in [" + + segmentRootDir.getAbsolutePath() + "]"); + return; + } + + for (File segment : segmentDirs) { + LOG.info("Processing segment: [" + segment.getAbsolutePath() + "]"); + DataOutputStream doutputStream = null; + + File segmentDir = new File(segment.getAbsolutePath(), Content.DIR_NAME); + File[] partDirs = segmentDir.listFiles(new FileFilter() { + @Override + public boolean accept(File file) { + return file.canRead() && file.isDirectory(); + } + }); + + if (partDirs == null) { + LOG.warn("Skipping Corrupt Segment: [{}]", segment.getAbsolutePath()); + continue; + } + + for (File partDir : partDirs) { + try { + String segmentPath = partDir + "/data"; + Path file = new Path(segmentPath); + if (!new File(file.toString()).exists()) { + LOG.warn("Skipping segment: [" + segmentPath + + "]: no data directory present"); + continue; + } + + SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(file)); + + Writable key = (Writable) reader.getKeyClass().newInstance(); + Content content = null; + + while (reader.next(key)) { + content = new Content(); + reader.getCurrentValue(content); + String url = key.toString(); + String baseName = FilenameUtils.getBaseName(url); + String extension = FilenameUtils.getExtension(url); + if (extension == null || (extension != null && extension.equals(""))) { + extension = "html"; + } + + String filename = baseName + "." + extension; + ByteArrayInputStream bas = null; + Boolean filter = false; + try { + bas = new ByteArrayInputStream(content.getContent()); + String mimeType = new Tika().detect(content.getContent()); + collectStats(typeCounts, mimeType); + if (mimeType != null) { + if (mimeTypes == null + || Arrays.asList(mimeTypes).contains(mimeType)) { + collectStats(filteredCounts, mimeType); + filter = true; + } + } + } catch (Exception e) { + e.printStackTrace(); + LOG.warn("Tika is unable to detect type for: [" + url + "]"); + } finally { + if (bas != null) { + try { + bas.close(); + } catch (Exception ignore) { + } + } + } + + if (filter) { + if (!mimeTypeStats) { + String md5Ofurl = DumpFileUtil.getUrlMD5(url); + + String fullDir = outputDir.getAbsolutePath(); + if (!flatDir && !reverseURLDump) { + fullDir = DumpFileUtil.createTwoLevelsDirectory(fullDir, md5Ofurl); + } + + if (!Strings.isNullOrEmpty(fullDir)) { + String outputFullPath; + + if (reverseURLDump) { + String[] reversedURL = TableUtil.reverseUrl(url).split(":"); + reversedURL[0] = reversedURL[0].replace('.', '/'); + + String reversedURLPath = reversedURL[0] + "/" + DigestUtils.sha256Hex(url).toUpperCase(); + outputFullPath = String.format("%s/%s", fullDir, reversedURLPath); + + // We'll drop the trailing file name and create the nested structure if it doesn't already exist. + String[] splitPath = outputFullPath.split("/"); + File fullOutputDir = new File(org.apache.commons.lang3.StringUtils.join(Arrays.copyOf(splitPath, splitPath.length - 1), "/")); + + if (!fullOutputDir.exists()) { + fullOutputDir.mkdirs(); + } + } else { + outputFullPath = String.format("%s/%s", fullDir, DumpFileUtil.createFileName(md5Ofurl, baseName, extension)); + } + + File outputFile = new File(outputFullPath); + + if (!outputFile.exists()) { + LOG.info("Writing: [" + outputFullPath + "]"); + + // Modified to prevent FileNotFoundException (Invalid Argument) + FileOutputStream output = null; + try { + output = new FileOutputStream(outputFile); + IOUtils.write(content.getContent(), output); + } + catch (Exception e) { + LOG.warn("Write Error: [" + outputFullPath + "]"); + e.printStackTrace(); + } + finally { + if (output != null) { + output.flush(); + try { + output.close(); + } catch (Exception ignore) { + } + } + } + fileCount++; + } else { + LOG.info("Skipping writing: [" + outputFullPath + + "]: file already exists"); + } + } + } + } + } + reader.close(); + } finally { + fs.close(); + if (doutputStream != null) { + try { + doutputStream.close(); + } catch (Exception ignore) { + } + } + } + } + } + LOG.info("Dumper File Stats: " + + DumpFileUtil.displayFileTypes(typeCounts, filteredCounts)); + + if (mimeTypeStats) { + System.out.println("Dumper File Stats: " + + DumpFileUtil.displayFileTypes(typeCounts, filteredCounts)); + } + } + + /** + * Main method for invoking this tool + * + * @param args + * 1) output directory (which will be created) to host the raw data + * and 2) a directory containing one or more segments. + * @throws Exception + */ + public static void main(String[] args) throws Exception { + // boolean options + Option helpOpt = new Option("h", "help", false, "show this help message"); + // argument options + @SuppressWarnings("static-access") + Option outputOpt = OptionBuilder + .withArgName("outputDir") + .hasArg() + .withDescription( + "output directory (which will be created) to host the raw data") + .create("outputDir"); + @SuppressWarnings("static-access") + Option segOpt = OptionBuilder.withArgName("segment").hasArgs() + .withDescription("the segment(s) to use").create("segment"); + @SuppressWarnings("static-access") + Option mimeOpt = OptionBuilder + .withArgName("mimetype") + .hasArgs() + .withDescription( + "an optional list of mimetypes to dump, excluding all others. Defaults to all.") + .create("mimetype"); + @SuppressWarnings("static-access") + Option mimeStat = OptionBuilder + .withArgName("mimeStats") + .withDescription( + "only display mimetype stats for the segment(s) instead of dumping file.") + .create("mimeStats"); + @SuppressWarnings("static-access") + Option dirStructureOpt = OptionBuilder + .withArgName("flatdir") + .withDescription( + "optionally specify that the output directory should only contain files.") + .create("flatdir"); + @SuppressWarnings("static-access") + Option reverseURLOutput = OptionBuilder + .withArgName("reverseUrlDirs") + .withDescription( + "optionally specify to use reverse URL folders for output structure.") + .create("reverseUrlDirs"); + + // create the options + Options options = new Options(); + options.addOption(helpOpt); + options.addOption(outputOpt); + options.addOption(segOpt); + options.addOption(mimeOpt); + options.addOption(mimeStat); + options.addOption(dirStructureOpt); + options.addOption(reverseURLOutput); + + CommandLineParser parser = new GnuParser(); + try { + CommandLine line = parser.parse(options, args); + if (line.hasOption("help") || !line.hasOption("outputDir") + || (!line.hasOption("segment"))) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("FileDumper", options, true); + return; + } + + File outputDir = new File(line.getOptionValue("outputDir")); + File segmentRootDir = new File(line.getOptionValue("segment")); + String[] mimeTypes = line.getOptionValues("mimetype"); + boolean flatDir = line.hasOption("flatdir"); + boolean shouldDisplayStats = false; + if (line.hasOption("mimeStats")) + shouldDisplayStats = true; + boolean reverseURLDump = false; + if (line.hasOption("reverseUrlDirs")) + reverseURLDump = true; + + if (!outputDir.exists()) { + LOG.warn("Output directory: [" + outputDir.getAbsolutePath() + + "]: does not exist, creating it."); + if (!shouldDisplayStats) { + if (!outputDir.mkdirs()) + throw new Exception("Unable to create: [" + + outputDir.getAbsolutePath() + "]"); + } + } + + FileDumper dumper = new FileDumper(); + dumper.dump(outputDir, segmentRootDir, mimeTypes, flatDir, shouldDisplayStats, reverseURLDump); + } catch (Exception e) { + LOG.error("FileDumper: " + StringUtils.stringifyException(e)); + e.printStackTrace(); + return; + } + } + + private void collectStats(Map<String, Integer> typeCounts, String mimeType) { + typeCounts.put(mimeType, + typeCounts.containsKey(mimeType) ? typeCounts.get(mimeType) + 1 : 1); + } + +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/tools/FreeGenerator.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/tools/FreeGenerator.java b/nutch-core/src/main/java/org/apache/nutch/tools/FreeGenerator.java new file mode 100644 index 0000000..138372f --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/tools/FreeGenerator.java @@ -0,0 +1,214 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.tools; + +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map.Entry; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.hadoop.mapred.TextInputFormat; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.nutch.crawl.CrawlDatum; +import org.apache.nutch.crawl.Generator; +import org.apache.nutch.crawl.URLPartitioner; +import org.apache.nutch.net.URLFilters; +import org.apache.nutch.net.URLNormalizers; +import org.apache.nutch.scoring.ScoringFilters; +import org.apache.nutch.util.NutchConfiguration; +import org.apache.nutch.util.NutchJob; +import org.apache.nutch.util.TimingUtil; + +/** + * This tool generates fetchlists (segments to be fetched) from plain text files + * containing one URL per line. It's useful when arbitrary URL-s need to be + * fetched without adding them first to the CrawlDb, or during testing. + */ +public class FreeGenerator extends Configured implements Tool { + private static final Logger LOG = LoggerFactory + .getLogger(FreeGenerator.class); + + private static final String FILTER_KEY = "free.generator.filter"; + private static final String NORMALIZE_KEY = "free.generator.normalize"; + + public static class FG extends MapReduceBase implements + Mapper<WritableComparable<?>, Text, Text, Generator.SelectorEntry>, + Reducer<Text, Generator.SelectorEntry, Text, CrawlDatum> { + private URLNormalizers normalizers = null; + private URLFilters filters = null; + private ScoringFilters scfilters; + private CrawlDatum datum = new CrawlDatum(); + private Text url = new Text(); + private int defaultInterval = 0; + + @Override + public void configure(JobConf job) { + super.configure(job); + defaultInterval = job.getInt("db.fetch.interval.default", 0); + scfilters = new ScoringFilters(job); + if (job.getBoolean(FILTER_KEY, false)) { + filters = new URLFilters(job); + } + if (job.getBoolean(NORMALIZE_KEY, false)) { + normalizers = new URLNormalizers(job, URLNormalizers.SCOPE_INJECT); + } + } + + Generator.SelectorEntry entry = new Generator.SelectorEntry(); + + public void map(WritableComparable<?> key, Text value, + OutputCollector<Text, Generator.SelectorEntry> output, Reporter reporter) + throws IOException { + // value is a line of text + String urlString = value.toString(); + try { + if (normalizers != null) { + urlString = normalizers.normalize(urlString, + URLNormalizers.SCOPE_INJECT); + } + if (urlString != null && filters != null) { + urlString = filters.filter(urlString); + } + if (urlString != null) { + url.set(urlString); + scfilters.injectedScore(url, datum); + } + } catch (Exception e) { + LOG.warn("Error adding url '" + value.toString() + "', skipping: " + + StringUtils.stringifyException(e)); + return; + } + if (urlString == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("- skipping " + value.toString()); + } + return; + } + entry.datum = datum; + entry.url = url; + // https://issues.apache.org/jira/browse/NUTCH-1430 + entry.datum.setFetchInterval(defaultInterval); + output.collect(url, entry); + } + + public void reduce(Text key, Iterator<Generator.SelectorEntry> values, + OutputCollector<Text, CrawlDatum> output, Reporter reporter) + throws IOException { + // pick unique urls from values - discard the reduce key due to hash + // collisions + HashMap<Text, CrawlDatum> unique = new HashMap<Text, CrawlDatum>(); + while (values.hasNext()) { + Generator.SelectorEntry entry = values.next(); + unique.put(entry.url, entry.datum); + } + // output unique urls + for (Entry<Text, CrawlDatum> e : unique.entrySet()) { + output.collect(e.getKey(), e.getValue()); + } + } + } + + public int run(String[] args) throws Exception { + if (args.length < 2) { + System.err + .println("Usage: FreeGenerator <inputDir> <segmentsDir> [-filter] [-normalize]"); + System.err + .println("\tinputDir\tinput directory containing one or more input files."); + System.err + .println("\t\tEach text file contains a list of URLs, one URL per line"); + System.err + .println("\tsegmentsDir\toutput directory, where new segment will be created"); + System.err.println("\t-filter\trun current URLFilters on input URLs"); + System.err + .println("\t-normalize\trun current URLNormalizers on input URLs"); + return -1; + } + boolean filter = false; + boolean normalize = false; + if (args.length > 2) { + for (int i = 2; i < args.length; i++) { + if (args[i].equals("-filter")) { + filter = true; + } else if (args[i].equals("-normalize")) { + normalize = true; + } else { + LOG.error("Unknown argument: " + args[i] + ", exiting ..."); + return -1; + } + } + } + + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + long start = System.currentTimeMillis(); + LOG.info("FreeGenerator: starting at " + sdf.format(start)); + + JobConf job = new NutchJob(getConf()); + job.setBoolean(FILTER_KEY, filter); + job.setBoolean(NORMALIZE_KEY, normalize); + FileInputFormat.addInputPath(job, new Path(args[0])); + job.setInputFormat(TextInputFormat.class); + job.setMapperClass(FG.class); + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(Generator.SelectorEntry.class); + job.setPartitionerClass(URLPartitioner.class); + job.setReducerClass(FG.class); + String segName = Generator.generateSegmentName(); + job.setNumReduceTasks(job.getNumMapTasks()); + job.setOutputFormat(SequenceFileOutputFormat.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(CrawlDatum.class); + job.setOutputKeyComparatorClass(Generator.HashComparator.class); + FileOutputFormat.setOutputPath(job, new Path(args[1], new Path(segName, + CrawlDatum.GENERATE_DIR_NAME))); + try { + JobClient.runJob(job); + } catch (Exception e) { + LOG.error("FAILED: " + StringUtils.stringifyException(e)); + return -1; + } + long end = System.currentTimeMillis(); + LOG.info("FreeGenerator: finished at " + sdf.format(end) + ", elapsed: " + + TimingUtil.elapsedTime(start, end)); + return 0; + } + + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(NutchConfiguration.create(), new FreeGenerator(), + args); + System.exit(res); + } +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/tools/ResolveUrls.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/tools/ResolveUrls.java b/nutch-core/src/main/java/org/apache/nutch/tools/ResolveUrls.java new file mode 100644 index 0000000..2b1c63b --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/tools/ResolveUrls.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.tools; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.net.InetAddress; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.util.StringUtils; +import org.apache.nutch.util.URLUtil; + +/** + * A simple tool that will spin up multiple threads to resolve urls to ip + * addresses. This can be used to verify that pages that are failing due to + * UnknownHostException during fetching are actually bad and are not failing due + * to a dns problem in fetching. + */ +public class ResolveUrls { + + public static final Logger LOG = LoggerFactory.getLogger(ResolveUrls.class); + + private String urlsFile = null; + private int numThreads = 100; + private ExecutorService pool = null; + private static AtomicInteger numTotal = new AtomicInteger(0); + private static AtomicInteger numErrored = new AtomicInteger(0); + private static AtomicInteger numResolved = new AtomicInteger(0); + private static AtomicLong totalTime = new AtomicLong(0L); + + /** + * A Thread which gets the ip address of a single host by name. + */ + private static class ResolverThread extends Thread { + + private String url = null; + + public ResolverThread(String url) { + this.url = url; + } + + public void run() { + + numTotal.incrementAndGet(); + String host = URLUtil.getHost(url); + long start = System.currentTimeMillis(); + try { + + // get the address by name and if no error is thrown then it + // is resolved successfully + InetAddress.getByName(host); + LOG.info("Resolved: " + host); + numResolved.incrementAndGet(); + } catch (Exception uhe) { + LOG.info("Error Resolving: " + host); + numErrored.incrementAndGet(); + } + long end = System.currentTimeMillis(); + long total = (end - start); + totalTime.addAndGet(total); + LOG.info(", " + total + " millis"); + } + } + + /** + * Creates a thread pool for resolving urls. Reads in the url file on the + * local filesystem. For each url it attempts to resolve it keeping a total + * account of the number resolved, errored, and the amount of time. + */ + public void resolveUrls() { + + try { + + // create a thread pool with a fixed number of threads + pool = Executors.newFixedThreadPool(numThreads); + + // read in the urls file and loop through each line, one url per line + BufferedReader buffRead = new BufferedReader(new FileReader(new File( + urlsFile))); + String urlStr = null; + while ((urlStr = buffRead.readLine()) != null) { + + // spin up a resolver thread per url + LOG.info("Starting: " + urlStr); + pool.execute(new ResolverThread(urlStr)); + } + + // close the file and wait for up to 60 seconds before shutting down + // the thread pool to give urls time to finish resolving + buffRead.close(); + pool.awaitTermination(60, TimeUnit.SECONDS); + } catch (Exception e) { + + // on error shutdown the thread pool immediately + pool.shutdownNow(); + LOG.info(StringUtils.stringifyException(e)); + } + + // shutdown the thread pool and log totals + pool.shutdown(); + LOG.info("Total: " + numTotal.get() + ", Resovled: " + numResolved.get() + + ", Errored: " + numErrored.get() + ", Average Time: " + + totalTime.get() / numTotal.get()); + } + + /** + * Create a new ResolveUrls with a file from the local file system. + * + * @param urlsFile + * The local urls file, one url per line. + */ + public ResolveUrls(String urlsFile) { + this(urlsFile, 100); + } + + /** + * Create a new ResolveUrls with a urls file and a number of threads for the + * Thread pool. Number of threads is 100 by default. + * + * @param urlsFile + * The local urls file, one url per line. + * @param numThreads + * The number of threads used to resolve urls in parallel. + */ + public ResolveUrls(String urlsFile, int numThreads) { + this.urlsFile = urlsFile; + this.numThreads = numThreads; + } + + /** + * Runs the resolve urls tool. + */ + public static void main(String[] args) { + + Options options = new Options(); + OptionBuilder.withArgName("help"); + OptionBuilder.withDescription("show this help message"); + Option helpOpts = OptionBuilder.create("help"); + options.addOption(helpOpts); + + OptionBuilder.withArgName("urls"); + OptionBuilder.hasArg(); + OptionBuilder.withDescription("the urls file to check"); + Option urlOpts = OptionBuilder.create("urls"); + options.addOption(urlOpts); + + OptionBuilder.withArgName("numThreads"); + OptionBuilder.hasArgs(); + OptionBuilder.withDescription("the number of threads to use"); + Option numThreadOpts = OptionBuilder.create("numThreads"); + options.addOption(numThreadOpts); + + CommandLineParser parser = new GnuParser(); + try { + // parse out common line arguments + CommandLine line = parser.parse(options, args); + if (line.hasOption("help") || !line.hasOption("urls")) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("ResolveUrls", options); + return; + } + + // get the urls and the number of threads and start the resolver + String urls = line.getOptionValue("urls"); + int numThreads = 100; + String numThreadsStr = line.getOptionValue("numThreads"); + if (numThreadsStr != null) { + numThreads = Integer.parseInt(numThreadsStr); + } + ResolveUrls resolve = new ResolveUrls(urls, numThreads); + resolve.resolveUrls(); + } catch (Exception e) { + LOG.error("ResolveUrls: " + StringUtils.stringifyException(e)); + } + } +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/tools/WARCUtils.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/tools/WARCUtils.java b/nutch-core/src/main/java/org/apache/nutch/tools/WARCUtils.java new file mode 100644 index 0000000..d8ae0b3 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/tools/WARCUtils.java @@ -0,0 +1,154 @@ +package org.apache.nutch.tools; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Date; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.nutch.indexer.NutchDocument; +import org.apache.nutch.util.StringUtil; +import org.archive.format.http.HttpHeaders; +import org.archive.format.warc.WARCConstants; +import org.archive.io.warc.WARCRecordInfo; +import org.archive.uid.UUIDGenerator; +import org.archive.util.DateUtils; +import org.archive.util.anvl.ANVLRecord; + +public class WARCUtils { + public final static String SOFTWARE = "software"; + public final static String HTTP_HEADER_FROM = "http-header-from"; + public final static String HTTP_HEADER_USER_AGENT = "http-header-user-agent"; + public final static String HOSTNAME = "hostname"; + public final static String ROBOTS = "robots"; + public final static String OPERATOR = "operator"; + public final static String FORMAT = "format"; + public final static String CONFORMS_TO = "conformsTo"; + public final static String IP = "ip"; + public final static UUIDGenerator generator = new UUIDGenerator(); + + public static final ANVLRecord getWARCInfoContent(Configuration conf) { + ANVLRecord record = new ANVLRecord(); + + // informative headers + record.addLabelValue(FORMAT, "WARC File Format 1.0"); + record.addLabelValue(CONFORMS_TO, "http://bibnum.bnf.fr/WARC/WARC_ISO_28500_version1_latestdraft.pdf"); + + record.addLabelValue(SOFTWARE, conf.get("http.agent.name", "")); + record.addLabelValue(HTTP_HEADER_USER_AGENT, + getAgentString(conf.get("http.agent.name", ""), + conf.get("http.agent.version", ""), + conf.get("http.agent.description", ""), + conf.get("http.agent.url", ""), + conf.get("http.agent.email", ""))); + record.addLabelValue(HTTP_HEADER_FROM, + conf.get("http.agent.email", "")); + + try { + record.addLabelValue(HOSTNAME, getHostname(conf)); + record.addLabelValue(IP, getIPAddress(conf)); + } catch (UnknownHostException ignored) { + // do nothing as this fields are optional + } + + record.addLabelValue(ROBOTS, "classic"); // TODO Make configurable? + record.addLabelValue(OPERATOR, conf.get("http.agent.email", "")); + + return record; + } + + public static final String getHostname(Configuration conf) + throws UnknownHostException { + + return StringUtil.isEmpty(conf.get("http.agent.host", "")) ? + InetAddress.getLocalHost().getHostName() : + conf.get("http.agent.host"); + } + + public static final String getIPAddress(Configuration conf) + throws UnknownHostException { + + return InetAddress.getLocalHost().getHostAddress(); + } + + public static final byte[] toByteArray(HttpHeaders headers) + throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + headers.write(out); + + return out.toByteArray(); + } + + public static final String getAgentString(String name, String version, + String description, String URL, String email) { + + StringBuffer buf = new StringBuffer(); + + buf.append(name); + + if (version != null) { + buf.append("/").append(version); + } + + if (((description != null) && (description.length() != 0)) || ( + (email != null) && (email.length() != 0)) || ((URL != null) && ( + URL.length() != 0))) { + buf.append(" ("); + + if ((description != null) && (description.length() != 0)) { + buf.append(description); + if ((URL != null) || (email != null)) + buf.append("; "); + } + + if ((URL != null) && (URL.length() != 0)) { + buf.append(URL); + if (email != null) + buf.append("; "); + } + + if ((email != null) && (email.length() != 0)) + buf.append(email); + + buf.append(")"); + } + + return buf.toString(); + } + + public static final WARCRecordInfo docToMetadata(NutchDocument doc) + throws UnsupportedEncodingException { + WARCRecordInfo record = new WARCRecordInfo(); + + record.setType(WARCConstants.WARCRecordType.metadata); + record.setUrl((String) doc.getFieldValue("id")); + record.setCreate14DigitDate( + DateUtils.get14DigitDate((Date) doc.getFieldValue("tstamp"))); + record.setMimetype("application/warc-fields"); + record.setRecordId(generator.getRecordID()); + + // metadata + ANVLRecord metadata = new ANVLRecord(); + + for (String field : doc.getFieldNames()) { + List<Object> values = doc.getField(field).getValues(); + for (Object value : values) { + if (value instanceof Date) { + metadata.addLabelValue(field, DateUtils.get14DigitDate()); + } else { + metadata.addLabelValue(field, (String) value); + } + } + } + + record.setContentLength(metadata.getLength()); + record.setContentStream( + new ByteArrayInputStream(metadata.getUTF8Bytes())); + + return record; + } +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/tools/arc/ArcInputFormat.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/tools/arc/ArcInputFormat.java b/nutch-core/src/main/java/org/apache/nutch/tools/arc/ArcInputFormat.java new file mode 100644 index 0000000..0eb7bf6 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/tools/arc/ArcInputFormat.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.tools.arc; + +import java.io.IOException; + +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; + +/** + * A input format the reads arc files. + */ +public class ArcInputFormat extends FileInputFormat<Text, BytesWritable> { + + /** + * Returns the <code>RecordReader</code> for reading the arc file. + * + * @param split + * The InputSplit of the arc file to process. + * @param job + * The job configuration. + * @param reporter + * The progress reporter. + */ + public RecordReader<Text, BytesWritable> getRecordReader(InputSplit split, + JobConf job, Reporter reporter) throws IOException { + reporter.setStatus(split.toString()); + return new ArcRecordReader(job, (FileSplit) split); + } + +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/tools/arc/ArcRecordReader.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/tools/arc/ArcRecordReader.java b/nutch-core/src/main/java/org/apache/nutch/tools/arc/ArcRecordReader.java new file mode 100644 index 0000000..e9ff58d --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/tools/arc/ArcRecordReader.java @@ -0,0 +1,299 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.tools.arc; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.zip.GZIPInputStream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.StringUtils; + +/** + * <p> + * The <code>ArchRecordReader</code> class provides a record reader which reads + * records from arc files. + * </p> + * + * <p> + * Arc files are essentially tars of gzips. Each record in an arc file is a + * compressed gzip. Multiple records are concatenated together to form a + * complete arc. For more information on the arc file format see {@link http + * ://www.archive.org/web/researcher/ArcFileFormat.php } . + * </p> + * + * <p> + * Arc files are used by the internet archive and grub projects. + * </p> + * + * see {@link http://www.archive.org/ } see {@link http://www.grub.org/ } + */ +public class ArcRecordReader implements RecordReader<Text, BytesWritable> { + + public static final Logger LOG = LoggerFactory + .getLogger(ArcRecordReader.class); + + protected Configuration conf; + protected long splitStart = 0; + protected long pos = 0; + protected long splitEnd = 0; + protected long splitLen = 0; + protected long fileLen = 0; + protected FSDataInputStream in; + + private static byte[] MAGIC = { (byte) 0x1F, (byte) 0x8B }; + + /** + * <p> + * Returns true if the byte array passed matches the gzip header magic number. + * </p> + * + * @param input + * The byte array to check. + * + * @return True if the byte array matches the gzip header magic number. + */ + public static boolean isMagic(byte[] input) { + + // check for null and incorrect length + if (input == null || input.length != MAGIC.length) { + return false; + } + + // check byte by byte + for (int i = 0; i < MAGIC.length; i++) { + if (MAGIC[i] != input[i]) { + return false; + } + } + + // must match + return true; + } + + /** + * Constructor that sets the configuration and file split. + * + * @param conf + * The job configuration. + * @param split + * The file split to read from. + * + * @throws IOException + * If an IO error occurs while initializing file split. + */ + public ArcRecordReader(Configuration conf, FileSplit split) + throws IOException { + + Path path = split.getPath(); + FileSystem fs = path.getFileSystem(conf); + fileLen = fs.getFileStatus(split.getPath()).getLen(); + this.conf = conf; + this.in = fs.open(split.getPath()); + this.splitStart = split.getStart(); + this.splitEnd = splitStart + split.getLength(); + this.splitLen = split.getLength(); + in.seek(splitStart); + } + + /** + * Closes the record reader resources. + */ + public void close() throws IOException { + this.in.close(); + } + + /** + * Creates a new instance of the <code>Text</code> object for the key. + */ + public Text createKey() { + return ReflectionUtils.newInstance(Text.class, conf); + } + + /** + * Creates a new instance of the <code>BytesWritable</code> object for the key + */ + public BytesWritable createValue() { + return ReflectionUtils.newInstance(BytesWritable.class, conf); + } + + /** + * Returns the current position in the file. + * + * @return The long of the current position in the file. + */ + public long getPos() throws IOException { + return in.getPos(); + } + + /** + * Returns the percentage of progress in processing the file. This will be + * represented as a float from 0 to 1 with 1 being 100% completed. + * + * @return The percentage of progress as a float from 0 to 1. + */ + public float getProgress() throws IOException { + + // if we haven't even started + if (splitEnd == splitStart) { + return 0.0f; + } else { + // the progress is current pos - where we started / length of the split + return Math.min(1.0f, (getPos() - splitStart) / (float) splitLen); + } + } + + /** + * <p> + * Returns true if the next record in the split is read into the key and value + * pair. The key will be the arc record header and the values will be the raw + * content bytes of the arc record. + * </p> + * + * @param key + * The record key + * @param value + * The record value + * + * @return True if the next record is read. + * + * @throws IOException + * If an error occurs while reading the record value. + */ + public boolean next(Text key, BytesWritable value) throws IOException { + + try { + + // get the starting position on the input stream + long startRead = in.getPos(); + byte[] magicBuffer = null; + + // we need this loop to handle false positives in reading of gzip records + while (true) { + + // while we haven't passed the end of the split + if (startRead >= splitEnd) { + return false; + } + + // scanning for the gzip header + boolean foundStart = false; + while (!foundStart) { + + // start at the current file position and scan for 1K at time, break + // if there is no more to read + startRead = in.getPos(); + magicBuffer = new byte[1024]; + int read = in.read(magicBuffer); + if (read < 0) { + break; + } + + // scan the byte array for the gzip header magic number. This happens + // byte by byte + for (int i = 0; i < read - 1; i++) { + byte[] testMagic = new byte[2]; + System.arraycopy(magicBuffer, i, testMagic, 0, 2); + if (isMagic(testMagic)) { + // set the next start to the current gzip header + startRead += i; + foundStart = true; + break; + } + } + } + + // seek to the start of the gzip header + in.seek(startRead); + ByteArrayOutputStream baos = null; + int totalRead = 0; + + try { + + // read 4K of the gzip at a time putting into a byte array + byte[] buffer = new byte[4096]; + GZIPInputStream zin = new GZIPInputStream(in); + int gzipRead = -1; + baos = new ByteArrayOutputStream(); + while ((gzipRead = zin.read(buffer, 0, buffer.length)) != -1) { + baos.write(buffer, 0, gzipRead); + totalRead += gzipRead; + } + } catch (Exception e) { + + // there are times we get false positives where the gzip header exists + // but it is not an actual gzip record, so we ignore it and start + // over seeking + System.out.println("Ignoring position: " + (startRead)); + if (startRead + 1 < fileLen) { + in.seek(startRead + 1); + } + continue; + } + + // change the output stream to a byte array + byte[] content = baos.toByteArray(); + + // the first line of the raw content in arc files is the header + int eol = 0; + for (int i = 0; i < content.length; i++) { + if (i > 0 && content[i] == '\n') { + eol = i; + break; + } + } + + // create the header and the raw content minus the header + String header = new String(content, 0, eol).trim(); + byte[] raw = new byte[(content.length - eol) - 1]; + System.arraycopy(content, eol + 1, raw, 0, raw.length); + + // populate key and values with the header and raw content. + Text keyText = key; + keyText.set(header); + BytesWritable valueBytes = value; + valueBytes.set(raw, 0, raw.length); + + // TODO: It would be best to start at the end of the gzip read but + // the bytes read in gzip don't match raw bytes in the file so we + // overshoot the next header. With this current method you get + // some false positives but don't miss records. + if (startRead + 1 < fileLen) { + in.seek(startRead + 1); + } + + // populated the record, now return + return true; + } + } catch (Exception e) { + LOG.equals(StringUtils.stringifyException(e)); + } + + // couldn't populate the record or there is no next record to read + return false; + } +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/tools/arc/ArcSegmentCreator.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/tools/arc/ArcSegmentCreator.java b/nutch-core/src/main/java/org/apache/nutch/tools/arc/ArcSegmentCreator.java new file mode 100644 index 0000000..39b8d95 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/tools/arc/ArcSegmentCreator.java @@ -0,0 +1,426 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.tools.arc; + +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Map.Entry; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.nutch.crawl.CrawlDatum; +import org.apache.nutch.crawl.NutchWritable; +import org.apache.nutch.crawl.SignatureFactory; +import org.apache.nutch.fetcher.FetcherOutputFormat; +import org.apache.nutch.metadata.Metadata; +import org.apache.nutch.metadata.Nutch; +import org.apache.nutch.net.URLFilters; +import org.apache.nutch.net.URLNormalizers; +import org.apache.nutch.parse.Parse; +import org.apache.nutch.parse.ParseImpl; +import org.apache.nutch.parse.ParseResult; +import org.apache.nutch.parse.ParseStatus; +import org.apache.nutch.parse.ParseText; +import org.apache.nutch.parse.ParseUtil; +import org.apache.nutch.protocol.Content; +import org.apache.nutch.protocol.ProtocolStatus; +import org.apache.nutch.scoring.ScoringFilters; +import org.apache.nutch.util.NutchConfiguration; +import org.apache.nutch.util.NutchJob; +import org.apache.nutch.util.StringUtil; +import org.apache.nutch.util.TimingUtil; + +/** + * <p> + * The <code>ArcSegmentCreator</code> is a replacement for fetcher that will + * take arc files as input and produce a nutch segment as output. + * </p> + * + * <p> + * Arc files are tars of compressed gzips which are produced by both the + * internet archive project and the grub distributed crawler project. + * </p> + * + */ +public class ArcSegmentCreator extends Configured implements Tool, + Mapper<Text, BytesWritable, Text, NutchWritable> { + + public static final Logger LOG = LoggerFactory + .getLogger(ArcSegmentCreator.class); + public static final String URL_VERSION = "arc.url.version"; + private JobConf jobConf; + private URLFilters urlFilters; + private ScoringFilters scfilters; + private ParseUtil parseUtil; + private URLNormalizers normalizers; + private int interval; + + private static SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss"); + + public ArcSegmentCreator() { + + } + + /** + * <p> + * Constructor that sets the job configuration. + * </p> + * + * @param conf + */ + public ArcSegmentCreator(Configuration conf) { + setConf(conf); + } + + /** + * Generates a random name for the segments. + * + * @return The generated segment name. + */ + public static synchronized String generateSegmentName() { + try { + Thread.sleep(1000); + } catch (Throwable t) { + } + return sdf.format(new Date(System.currentTimeMillis())); + } + + /** + * <p> + * Configures the job. Sets the url filters, scoring filters, url normalizers + * and other relevant data. + * </p> + * + * @param job + * The job configuration. + */ + public void configure(JobConf job) { + + // set the url filters, scoring filters the parse util and the url + // normalizers + this.jobConf = job; + this.urlFilters = new URLFilters(jobConf); + this.scfilters = new ScoringFilters(jobConf); + this.parseUtil = new ParseUtil(jobConf); + this.normalizers = new URLNormalizers(jobConf, URLNormalizers.SCOPE_FETCHER); + interval = jobConf.getInt("db.fetch.interval.default", 2592000); + } + + public void close() { + } + + /** + * <p> + * Parses the raw content of a single record to create output. This method is + * almost the same as the {@link org.apache.nutch.Fetcher#output} method in + * terms of processing and output. + * + * @param output + * The job output collector. + * @param segmentName + * The name of the segment to create. + * @param key + * The url of the record. + * @param datum + * The CrawlDatum of the record. + * @param content + * The raw content of the record + * @param pstatus + * The protocol status + * @param status + * The fetch status. + * + * @return The result of the parse in a ParseStatus object. + */ + private ParseStatus output(OutputCollector<Text, NutchWritable> output, + String segmentName, Text key, CrawlDatum datum, Content content, + ProtocolStatus pstatus, int status) { + + // set the fetch status and the fetch time + datum.setStatus(status); + datum.setFetchTime(System.currentTimeMillis()); + if (pstatus != null) + datum.getMetaData().put(Nutch.WRITABLE_PROTO_STATUS_KEY, pstatus); + + ParseResult parseResult = null; + if (content != null) { + Metadata metadata = content.getMetadata(); + // add segment to metadata + metadata.set(Nutch.SEGMENT_NAME_KEY, segmentName); + // add score to content metadata so that ParseSegment can pick it up. + try { + scfilters.passScoreBeforeParsing(key, datum, content); + } catch (Exception e) { + if (LOG.isWarnEnabled()) { + LOG.warn("Couldn't pass score, url " + key + " (" + e + ")"); + } + } + + try { + + // parse the content + parseResult = this.parseUtil.parse(content); + } catch (Exception e) { + LOG.warn("Error parsing: " + key + ": " + + StringUtils.stringifyException(e)); + } + + // set the content signature + if (parseResult == null) { + byte[] signature = SignatureFactory.getSignature(getConf()).calculate( + content, new ParseStatus().getEmptyParse(getConf())); + datum.setSignature(signature); + } + + try { + output.collect(key, new NutchWritable(datum)); + output.collect(key, new NutchWritable(content)); + + if (parseResult != null) { + for (Entry<Text, Parse> entry : parseResult) { + Text url = entry.getKey(); + Parse parse = entry.getValue(); + ParseStatus parseStatus = parse.getData().getStatus(); + + if (!parseStatus.isSuccess()) { + LOG.warn("Error parsing: " + key + ": " + parseStatus); + parse = parseStatus.getEmptyParse(getConf()); + } + + // Calculate page signature. + byte[] signature = SignatureFactory.getSignature(getConf()) + .calculate(content, parse); + // Ensure segment name and score are in parseData metadata + parse.getData().getContentMeta() + .set(Nutch.SEGMENT_NAME_KEY, segmentName); + parse.getData().getContentMeta() + .set(Nutch.SIGNATURE_KEY, StringUtil.toHexString(signature)); + // Pass fetch time to content meta + parse.getData().getContentMeta() + .set(Nutch.FETCH_TIME_KEY, Long.toString(datum.getFetchTime())); + if (url.equals(key)) + datum.setSignature(signature); + try { + scfilters.passScoreAfterParsing(url, content, parse); + } catch (Exception e) { + if (LOG.isWarnEnabled()) { + LOG.warn("Couldn't pass score, url " + key + " (" + e + ")"); + } + } + output.collect(url, new NutchWritable(new ParseImpl(new ParseText( + parse.getText()), parse.getData(), parse.isCanonical()))); + } + } + } catch (IOException e) { + if (LOG.isErrorEnabled()) { + LOG.error("ArcSegmentCreator caught:" + + StringUtils.stringifyException(e)); + } + } + + // return parse status if it exits + if (parseResult != null && !parseResult.isEmpty()) { + Parse p = parseResult.get(content.getUrl()); + if (p != null) { + return p.getData().getStatus(); + } + } + } + + return null; + } + + /** + * <p> + * Logs any error that occurs during conversion. + * </p> + * + * @param url + * The url we are parsing. + * @param t + * The error that occured. + */ + private void logError(Text url, Throwable t) { + if (LOG.isInfoEnabled()) { + LOG.info("Conversion of " + url + " failed with: " + + StringUtils.stringifyException(t)); + } + } + + /** + * <p> + * Runs the Map job to translate an arc record into output for Nutch segments. + * </p> + * + * @param key + * The arc record header. + * @param bytes + * The arc record raw content bytes. + * @param output + * The output collecter. + * @param reporter + * The progress reporter. + */ + public void map(Text key, BytesWritable bytes, + OutputCollector<Text, NutchWritable> output, Reporter reporter) + throws IOException { + + String[] headers = key.toString().split("\\s+"); + String urlStr = headers[0]; + String version = headers[2]; + String contentType = headers[3]; + + // arcs start with a file description. for now we ignore this as it is not + // a content record + if (urlStr.startsWith("filedesc://")) { + LOG.info("Ignoring file header: " + urlStr); + return; + } + LOG.info("Processing: " + urlStr); + + // get the raw bytes from the arc file, create a new crawldatum + Text url = new Text(); + CrawlDatum datum = new CrawlDatum(CrawlDatum.STATUS_DB_FETCHED, interval, + 1.0f); + String segmentName = getConf().get(Nutch.SEGMENT_NAME_KEY); + + // normalize and filter the urls + try { + urlStr = normalizers.normalize(urlStr, URLNormalizers.SCOPE_FETCHER); + urlStr = urlFilters.filter(urlStr); // filter the url + } catch (Exception e) { + if (LOG.isWarnEnabled()) { + LOG.warn("Skipping " + url + ":" + e); + } + urlStr = null; + } + + // if still a good url then process + if (urlStr != null) { + + url.set(urlStr); + try { + + // set the protocol status to success and the crawl status to success + // create the content from the normalized url and the raw bytes from + // the arc file, TODO: currently this doesn't handle text of errors + // pages (i.e. 404, etc.). We assume we won't get those. + ProtocolStatus status = ProtocolStatus.STATUS_SUCCESS; + Content content = new Content(urlStr, urlStr, bytes.getBytes(), + contentType, new Metadata(), getConf()); + + // set the url version into the metadata + content.getMetadata().set(URL_VERSION, version); + ParseStatus pstatus = null; + pstatus = output(output, segmentName, url, datum, content, status, + CrawlDatum.STATUS_FETCH_SUCCESS); + reporter.progress(); + } catch (Throwable t) { // unexpected exception + logError(url, t); + output(output, segmentName, url, datum, null, null, + CrawlDatum.STATUS_FETCH_RETRY); + } + } + } + + /** + * <p> + * Creates the arc files to segments job. + * </p> + * + * @param arcFiles + * The path to the directory holding the arc files + * @param segmentsOutDir + * The output directory for writing the segments + * + * @throws IOException + * If an IO error occurs while running the job. + */ + public void createSegments(Path arcFiles, Path segmentsOutDir) + throws IOException { + + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + long start = System.currentTimeMillis(); + if (LOG.isInfoEnabled()) { + LOG.info("ArcSegmentCreator: starting at " + sdf.format(start)); + LOG.info("ArcSegmentCreator: arc files dir: " + arcFiles); + } + + JobConf job = new NutchJob(getConf()); + job.setJobName("ArcSegmentCreator " + arcFiles); + String segName = generateSegmentName(); + job.set(Nutch.SEGMENT_NAME_KEY, segName); + FileInputFormat.addInputPath(job, arcFiles); + job.setInputFormat(ArcInputFormat.class); + job.setMapperClass(ArcSegmentCreator.class); + FileOutputFormat.setOutputPath(job, new Path(segmentsOutDir, segName)); + job.setOutputFormat(FetcherOutputFormat.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(NutchWritable.class); + + JobClient.runJob(job); + + long end = System.currentTimeMillis(); + LOG.info("ArcSegmentCreator: finished at " + sdf.format(end) + + ", elapsed: " + TimingUtil.elapsedTime(start, end)); + } + + public static void main(String args[]) throws Exception { + int res = ToolRunner.run(NutchConfiguration.create(), + new ArcSegmentCreator(), args); + System.exit(res); + } + + public int run(String[] args) throws Exception { + + String usage = "Usage: ArcSegmentCreator <arcFiles> <segmentsOutDir>"; + + if (args.length < 2) { + System.err.println(usage); + return -1; + } + + // set the arc files directory and the segments output directory + Path arcFiles = new Path(args[0]); + Path segmentsOutDir = new Path(args[1]); + + try { + // create the segments from the arc files + createSegments(arcFiles, segmentsOutDir); + return 0; + } catch (Exception e) { + LOG.error("ArcSegmentCreator: " + StringUtils.stringifyException(e)); + return -1; + } + } +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/tools/arc/package-info.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/tools/arc/package-info.java b/nutch-core/src/main/java/org/apache/nutch/tools/arc/package-info.java new file mode 100644 index 0000000..cb6e115 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/tools/arc/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Tools to read the + * <a href="http://archive.org/web/researcher/ArcFileFormat.php">Arc file format</a>. + */ +package org.apache.nutch.tools.arc; + http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/tools/package-info.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/tools/package-info.java b/nutch-core/src/main/java/org/apache/nutch/tools/package-info.java new file mode 100644 index 0000000..3b868c5 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/tools/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Miscellaneous tools. + */ +package org.apache.nutch.tools; +
