Author: jnioche
Date: Tue Sep 22 14:04:10 2015
New Revision: 1704634
URL: http://svn.apache.org/viewvc?rev=1704634&view=rev
Log:
NUTCH-2102 WARC Exporter
Added:
nutch/trunk/src/java/org/apache/nutch/tools/warc/
nutch/trunk/src/java/org/apache/nutch/tools/warc/WARCExporter.java
nutch/trunk/src/java/org/apache/nutch/tools/warc/package-info.java
Modified:
nutch/trunk/CHANGES.txt
nutch/trunk/ivy/ivy.xml
nutch/trunk/src/bin/nutch
Modified: nutch/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/nutch/trunk/CHANGES.txt?rev=1704634&r1=1704633&r2=1704634&view=diff
==============================================================================
--- nutch/trunk/CHANGES.txt (original)
+++ nutch/trunk/CHANGES.txt Tue Sep 22 14:04:10 2015
@@ -2,6 +2,8 @@ Nutch Change Log
Nutch Current Development 1.11-SNAPSHOT
+* NUTCH-2102 WARC Exporter (jnioche)
+
* NUTCH-2106 Runtime to contain Selenium and dependencies only once (snagel)
* NUTCH-2104 Add documentation to the protocol-selenium plugin Readme file
Modified: nutch/trunk/ivy/ivy.xml
URL:
http://svn.apache.org/viewvc/nutch/trunk/ivy/ivy.xml?rev=1704634&r1=1704633&r2=1704634&view=diff
==============================================================================
--- nutch/trunk/ivy/ivy.xml (original)
+++ nutch/trunk/ivy/ivy.xml Tue Sep 22 14:04:10 2015
@@ -70,6 +70,8 @@
<dependency org="com.google.guava" name="guava" rev="16.0.1" />
<dependency org="com.github.crawler-commons"
name="crawler-commons" rev="0.6" />
+
+ <dependency org="com.martinkl.warc" name="warc-hadoop"
rev="0.1.0" />
<dependency org="org.apache.cxf" name="cxf" rev="3.0.4"
conf="*->default"/>
<dependency org="org.apache.cxf" name="cxf-rt-frontend-jaxws"
rev="3.0.4" conf="*->default"/>
Modified: nutch/trunk/src/bin/nutch
URL:
http://svn.apache.org/viewvc/nutch/trunk/src/bin/nutch?rev=1704634&r1=1704633&r2=1704634&view=diff
==============================================================================
--- nutch/trunk/src/bin/nutch (original)
+++ nutch/trunk/src/bin/nutch Tue Sep 22 14:04:10 2015
@@ -87,6 +87,7 @@ if [ $# = 0 ]; then
echo " plugin load a plugin and run one of its classes main()"
echo " junit runs the given JUnit test"
echo " startserver runs the Nutch Server on localhost:8081"
+ echo " warc exports crawled data from segments at the WARC
format"
echo " or"
echo " CLASSNAME run the class named CLASSNAME"
echo "Most commands print help when invoked w/o parameters."
@@ -278,6 +279,8 @@ elif [ "$COMMAND" = "junit" ] ; then
CLASS=org.junit.runner.JUnitCore
elif [ "$COMMAND" = "startserver" ] ; then
CLASS=org.apache.nutch.service.NutchServer
+elif [ "$COMMAND" = "warc" ] ; then
+ CLASS=org.apache.nutch.tools.warc.WARCExporter
else
CLASS=$COMMAND
fi
Added: nutch/trunk/src/java/org/apache/nutch/tools/warc/WARCExporter.java
URL:
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/tools/warc/WARCExporter.java?rev=1704634&view=auto
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/tools/warc/WARCExporter.java (added)
+++ nutch/trunk/src/java/org/apache/nutch/tools/warc/WARCExporter.java Tue Sep
22 14:04:10 2015
@@ -0,0 +1,333 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.tools.warc;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.UUID;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.NutchWritable;
+import org.apache.nutch.parse.ParseSegment;
+import org.apache.nutch.protocol.Content;
+import org.apache.nutch.util.HadoopFSUtil;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.TimingUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.martinkl.warc.WARCRecord;
+import com.martinkl.warc.WARCWritable;
+import com.martinkl.warc.mapred.WARCOutputFormat;
+
+/**
+ * MapReduce job to exports Nutch segments as WARC files. The file format is
+ * documented in the [ISO
+ *
Standard](http://bibnum.bnf.fr/warc/WARC_ISO_28500_version1_latestdraft.pdf).
+ * Generates elements of type response if the configuration
'store.http.headers'
+ * was set to true during the fetching and the http headers were stored
+ * verbatim; generates elements of type 'resource' otherwise.
+ **/
+
+public class WARCExporter extends Configured implements Tool {
+
+ public static Logger LOG = LoggerFactory.getLogger(WARCExporter.class);
+
+ private static final String CRLF = "\r\n";
+ private static final byte[] CRLF_BYTES = { 13, 10 };
+
+ public WARCExporter() {
+ super(null);
+ }
+
+ public WARCExporter(Configuration conf) {
+ super(conf);
+ }
+
+ public static class WARCReducer
+ implements Mapper<Text, Writable, Text, NutchWritable>,
+ Reducer<Text, NutchWritable, NullWritable, WARCWritable> {
+
+ SimpleDateFormat warcdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'",
+ Locale.ENGLISH);
+
+ @Override
+ public void configure(JobConf job) {
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ public void map(Text key, Writable value,
+ OutputCollector<Text, NutchWritable> output, Reporter reporter)
+ throws IOException {
+ output.collect(key, new NutchWritable(value));
+ }
+
+ @Override
+ public void reduce(Text key, Iterator<NutchWritable> values,
+ OutputCollector<NullWritable, WARCWritable> output, Reporter reporter)
+ throws IOException {
+
+ Content content = null;
+ CrawlDatum cd = null;
+
+ // aggregate the values found
+ while (values.hasNext()) {
+ final Writable value = values.next().get(); // unwrap
+ if (value instanceof Content) {
+ content = (Content) value;
+ continue;
+ }
+ if (value instanceof CrawlDatum) {
+ cd = (CrawlDatum) value;
+ continue;
+ }
+ }
+
+ // check that we have everything we need
+ if (content == null) {
+ LOG.info("Missing content for {}", key);
+ reporter.getCounter("WARCExporter", "missing content").increment(1);
+ return;
+ }
+
+ if (cd == null) {
+ LOG.info("Missing fetch datum for {}", key);
+ reporter.getCounter("WARCExporter", "missing metadata").increment(1);
+ return;
+ }
+
+ // were the headers stored as is? Can write a response element then
+ String headersVerbatim = content.getMetadata().get("_response.headers_");
+ byte[] httpheaders = new byte[0];
+ if (StringUtils.isNotBlank(headersVerbatim)) {
+ // check that ends with an empty line
+ if (!headersVerbatim.endsWith(CRLF + CRLF)) {
+ headersVerbatim += CRLF + CRLF;
+ }
+ httpheaders = headersVerbatim.getBytes();
+ }
+
+ StringBuilder buffer = new StringBuilder();
+ buffer.append(WARCRecord.WARC_VERSION);
+ buffer.append(CRLF);
+
+ buffer.append("WARC-Record-ID").append(": ").append("<urn:uuid:")
+ .append(UUID.randomUUID().toString()).append(">").append(CRLF);
+
+ int contentLength = 0;
+ if (content != null) {
+ contentLength = content.getContent().length;
+ }
+
+ // add the length of the http header
+ contentLength += httpheaders.length;
+
+ buffer.append("Content-Length").append(": ")
+ .append(Integer.toString(contentLength)).append(CRLF);
+
+ Date fetchedDate = new Date(cd.getFetchTime());
+ buffer.append("WARC-Date").append(":
").append(warcdf.format(fetchedDate))
+ .append(CRLF);
+
+ // check if http headers have been stored verbatim
+ // if not generate a response instead
+ String WARCTypeValue = "resource";
+
+ if (StringUtils.isNotBlank(headersVerbatim)) {
+ WARCTypeValue = "response";
+ }
+
+ buffer.append("WARC-Type").append(": ").append(WARCTypeValue)
+ .append(CRLF);
+
+ // "WARC-IP-Address" if present
+ String IP = content.getMetadata().get("_ip_");
+ if (StringUtils.isNotBlank(IP)) {
+ buffer.append("WARC-IP-Address").append(":
").append("IP").append(CRLF);
+ }
+
+ // detect if truncated only for fetch success
+ String status = CrawlDatum.getStatusName(cd.getStatus());
+ if (status.equalsIgnoreCase("STATUS_FETCH_SUCCESS")
+ && ParseSegment.isTruncated(content)) {
+ buffer.append("WARC-Truncated").append(": ").append("unspecified")
+ .append(CRLF);
+ }
+
+ // must be a valid URI
+ try {
+ String normalised = key.toString().replaceAll(" ", "%20");
+ URI uri = URI.create(normalised);
+ buffer.append("WARC-Target-URI").append(": ")
+ .append(uri.toASCIIString()).append(CRLF);
+ } catch (Exception e) {
+ LOG.error("Invalid URI {} ", key);
+ reporter.getCounter("WARCExporter", "invalid URI").increment(1);
+ return;
+ }
+
+ // provide a ContentType if type response
+ if (WARCTypeValue.equals("response")) {
+ buffer.append("Content-Type: application/http; msgtype=response")
+ .append(CRLF);
+ }
+
+ // finished writing the WARC headers, now let's serialize it
+
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+
+ // store the headers
+ bos.write(buffer.toString().getBytes("UTF-8"));
+ bos.write(CRLF_BYTES);
+ // the http headers
+ bos.write(httpheaders);
+
+ // the binary content itself
+ if (content.getContent() != null) {
+ bos.write(content.getContent());
+ }
+ bos.write(CRLF_BYTES);
+ bos.write(CRLF_BYTES);
+
+ try {
+ DataInput in = new DataInputStream(
+ new ByteArrayInputStream(bos.toByteArray()));
+ WARCRecord record = new WARCRecord(in);
+ output.collect(NullWritable.get(), new WARCWritable(record));
+ reporter.getCounter("WARCExporter", "records generated").increment(1);
+ } catch (IOException exception) {
+ LOG.error("Exception when generating WARC record for {} : {}", key,
+ exception.getMessage(), exception);
+ reporter.getCounter("WARCExporter", "exception").increment(1);
+ }
+
+ }
+ }
+
+ public int generateWARC(String output, List<Path> segments) {
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ long start = System.currentTimeMillis();
+ LOG.info("WARCExporter: starting at {}", sdf.format(start));
+
+ final JobConf job = new NutchJob(getConf());
+ job.setJobName("warc-exporter " + output);
+
+ for (final Path segment : segments) {
+ LOG.info("warc-exporter: adding segment: " + segment);
+ FileInputFormat.addInputPath(job, new Path(segment, Content.DIR_NAME));
+ FileInputFormat.addInputPath(job,
+ new Path(segment, CrawlDatum.FETCH_DIR_NAME));
+ }
+
+ job.setInputFormat(SequenceFileInputFormat.class);
+
+ job.setMapperClass(WARCReducer.class);
+ job.setReducerClass(WARCReducer.class);
+
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(NutchWritable.class);
+
+ FileOutputFormat.setOutputPath(job, new Path(output));
+ // using the old api
+ job.setOutputFormat(WARCOutputFormat.class);
+
+ job.setOutputKeyClass(NullWritable.class);
+ job.setOutputValueClass(WARCWritable.class);
+
+ try {
+ RunningJob rj = JobClient.runJob(job);
+ LOG.info(rj.getCounters().toString());
+ long end = System.currentTimeMillis();
+ LOG.info("WARCExporter: finished at {}, elapsed: {}", sdf.format(end),
+ TimingUtil.elapsedTime(start, end));
+ } catch (Exception e) {
+ LOG.error("Exception caught", e);
+ return -1;
+ }
+
+ return 0;
+ }
+
+ public int run(String[] args) throws Exception {
+ if (args.length < 2) {
+ System.err.println(
+ "Usage: WARCExporter <output> (<segment> ... | -dir <segments>)");
+ return -1;
+ }
+
+ final List<Path> segments = new ArrayList<Path>();
+
+ for (int i = 1; i < args.length; i++) {
+ if (args[i].equals("-dir")) {
+ Path dir = new Path(args[++i]);
+ FileSystem fs = dir.getFileSystem(getConf());
+ FileStatus[] fstats = fs.listStatus(dir,
+ HadoopFSUtil.getPassDirectoriesFilter(fs));
+ Path[] files = HadoopFSUtil.getPaths(fstats);
+ for (Path p : files) {
+ segments.add(p);
+ }
+ } else {
+ segments.add(new Path(args[i]));
+ }
+ }
+
+ return generateWARC(args[0], segments);
+ }
+
+ public static void main(String[] args) throws Exception {
+ final int res = ToolRunner.run(NutchConfiguration.create(),
+ new WARCExporter(), args);
+ System.exit(res);
+ }
+}
Added: nutch/trunk/src/java/org/apache/nutch/tools/warc/package-info.java
URL:
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/tools/warc/package-info.java?rev=1704634&view=auto
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/tools/warc/package-info.java (added)
+++ nutch/trunk/src/java/org/apache/nutch/tools/warc/package-info.java Tue Sep
22 14:04:10 2015
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+/**
+ * Tools to import / export between Nutch segments and
+ * <a href="http://bibnum.bnf.fr/warc/WARC_ISO_28500_version1_latestdraft.pdf">
+ * WARC archives</a>.
+ */
+package org.apache.nutch.tools.warc;