Author: jorgelbg
Date: Tue Sep 22 12:23:31 2015
New Revision: 1704594
URL: http://svn.apache.org/viewvc?rev=1704594&view=rev
Log:
NUTCH-2095 WARC exporter for the CommonCrawlDataDumper
Added:
nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormatWARC.java
nutch/trunk/src/java/org/apache/nutch/tools/WARCUtils.java
Modified:
nutch/trunk/conf/nutch-default.xml
nutch/trunk/ivy/ivy.xml
nutch/trunk/src/java/org/apache/nutch/tools/AbstractCommonCrawlFormat.java
nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlConfig.java
nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlDataDumper.java
nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormat.java
nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormatFactory.java
nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormatJackson.java
nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormatJettinson.java
nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormatSimple.java
nutch/trunk/src/plugin/protocol-http/src/java/org/apache/nutch/protocol/http/HttpResponse.java
Modified: nutch/trunk/conf/nutch-default.xml
URL:
http://svn.apache.org/viewvc/nutch/trunk/conf/nutch-default.xml?rev=1704594&r1=1704593&r2=1704594&view=diff
==============================================================================
--- nutch/trunk/conf/nutch-default.xml (original)
+++ nutch/trunk/conf/nutch-default.xml Tue Sep 22 12:23:31 2015
@@ -1878,4 +1878,22 @@ CAUTION: Set the parser.timeout to -1 or
</description>
</property>
+<property>
+ <name>store.http.request</name>
+ <value>false</value>
+ <description>
+ Store the raw request made by Nutch, required to use the
CommonCrawlDataDumper
+ tool for the WARC format.
+ </description>
+</property>
+
+<property>
+ <name>store.http.headers</name>
+ <value>false</value>
+ <description>
+ Store the raw headers received by Nutch from the server, required to use
the
+ CommonCrawlDataDumper tool for the WARC format.
+ </description>
+</property>
+
</configuration>
Modified: nutch/trunk/ivy/ivy.xml
URL:
http://svn.apache.org/viewvc/nutch/trunk/ivy/ivy.xml?rev=1704594&r1=1704593&r2=1704594&view=diff
==============================================================================
--- nutch/trunk/ivy/ivy.xml (original)
+++ nutch/trunk/ivy/ivy.xml Tue Sep 22 12:23:31 2015
@@ -81,6 +81,11 @@
<dependency org="com.fasterxml.jackson.dataformat"
name="jackson-dataformat-cbor" rev="2.5.1" conf="*->default"/>
<dependency org="com.fasterxml.jackson.jaxrs"
name="jackson-jaxrs-json-provider" rev="2.5.1" conf="*->default"/>
+ <!-- WARC artifacts needed -->
+ <dependency org="org.netpreserve.commons"
name="webarchive-commons" rev="1.1.5" conf="*->default">
+ <exclude module="hadoop-core"/>
+ </dependency>
+
<!--artifacts needed for testing -->
<dependency org="junit" name="junit" rev="4.11"
conf="test->default" />
<!--dependency org="org.apache.hadoop" name="hadoop-test"
rev="1.2.0" conf="test->default" /-->
Modified:
nutch/trunk/src/java/org/apache/nutch/tools/AbstractCommonCrawlFormat.java
URL:
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/tools/AbstractCommonCrawlFormat.java?rev=1704594&r1=1704593&r2=1704594&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/tools/AbstractCommonCrawlFormat.java
(original)
+++ nutch/trunk/src/java/org/apache/nutch/tools/AbstractCommonCrawlFormat.java
Tue Sep 22 12:23:31 2015
@@ -18,12 +18,19 @@
package org.apache.nutch.tools;
import java.io.IOException;
+import java.io.UnsupportedEncodingException;
import java.net.InetAddress;
+import java.net.URLEncoder;
import java.net.UnknownHostException;
import java.text.ParseException;
+import org.apache.commons.httpclient.URIException;
+import org.apache.commons.httpclient.util.URIUtil;
+import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.nutch.metadata.Metadata;
+import org.apache.nutch.parse.ParseData;
+import org.apache.nutch.protocol.Content;
import org.apache.nutch.util.URLUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,32 +42,32 @@ import com.ibm.icu.text.SimpleDateFormat
*
*/
public abstract class AbstractCommonCrawlFormat implements CommonCrawlFormat {
- private static final Logger LOG =
LoggerFactory.getLogger(AbstractCommonCrawlFormat.class.getName());
-
+ protected static final Logger LOG =
LoggerFactory.getLogger(AbstractCommonCrawlFormat.class.getName());
+
protected String url;
-
- protected byte[] content;
-
+
+ protected Content content;
+
protected Metadata metadata;
-
+
protected Configuration conf;
-
+
protected String keyPrefix;
-
+
protected boolean simpleDateFormat;
-
+
protected boolean jsonArray;
-
+
protected boolean reverseKey;
-
+
protected String reverseKeyValue;
- public AbstractCommonCrawlFormat(String url, byte[] content, Metadata
metadata, Configuration nutchConf, CommonCrawlConfig config) throws IOException
{
+ public AbstractCommonCrawlFormat(String url, Content content, Metadata
metadata, Configuration nutchConf, CommonCrawlConfig config) throws IOException
{
this.url = url;
this.content = content;
this.metadata = metadata;
this.conf = nutchConf;
-
+
this.keyPrefix = config.getKeyPrefix();
this.simpleDateFormat = config.getSimpleDateFormat();
this.jsonArray = config.getJsonArray();
@@ -68,17 +75,33 @@ public abstract class AbstractCommonCraw
this.reverseKeyValue = config.getReverseKeyValue();
}
+ public String getJsonData(String url, Content content, Metadata
metadata)
+ throws IOException {
+ this.url = url;
+ this.content = content;
+ this.metadata = metadata;
+
+ return this.getJsonData();
+ }
+
+ public String getJsonData(String url, Content content, Metadata
metadata,
+ ParseData parseData) throws IOException {
+
+ // override of this is required in the actual formats
+ throw new NotImplementedException();
+ }
+
@Override
public String getJsonData() throws IOException {
try {
startObject(null);
-
+
// url
writeKeyValue("url", getUrl());
-
+
// timestamp
writeKeyValue("timestamp", getTimestamp());
-
+
// request
startObject("request");
writeKeyValue("method", getMethod());
@@ -102,7 +125,7 @@ public abstract class AbstractCommonCraw
closeHeaders("headers", false, true);
writeKeyNull("body");
closeObject("request");
-
+
// response
startObject("response");
writeKeyValue("status", getResponseStatus());
@@ -125,50 +148,56 @@ public abstract class AbstractCommonCraw
closeHeaders("headers", false, true);
writeKeyValue("body", getResponseContent());
closeObject("response");
-
+
// key
if (!this.keyPrefix.isEmpty()) {
this.keyPrefix += "-";
}
writeKeyValue("key", this.keyPrefix + getKey());
-
+
// imported
writeKeyValue("imported", getImported());
-
+
closeObject(null);
-
+
return generateJson();
-
+
} catch (IOException ioe) {
LOG.warn("Error in processing file " + url + ": " +
ioe.getMessage());
- throw new IOException("Error in generating JSON:" +
ioe.getMessage());
+ throw new IOException("Error in generating JSON:" +
ioe.getMessage());
}
}
-
+
// abstract methods
-
+
protected abstract void writeKeyValue(String key, String value) throws
IOException;
-
+
protected abstract void writeKeyNull(String key) throws IOException;
-
+
protected abstract void startArray(String key, boolean nested, boolean
newline) throws IOException;
-
+
protected abstract void closeArray(String key, boolean nested, boolean
newline) throws IOException;
-
+
protected abstract void writeArrayValue(String value) throws
IOException;
-
+
protected abstract void startObject(String key) throws IOException;
-
+
protected abstract void closeObject(String key) throws IOException;
-
+
protected abstract String generateJson() throws IOException;
-
+
// getters
-
+
protected String getUrl() {
+ try {
+ return URIUtil.encodePath(url);
+ } catch (URIException e) {
+ LOG.error("Can't encode URL " + url);
+ }
+
return url;
}
-
+
protected String getTimestamp() {
if (this.simpleDateFormat) {
String timestamp = null;
@@ -183,88 +212,88 @@ public abstract class AbstractCommonCraw
return
ifNullString(metadata.get(Metadata.LAST_MODIFIED));
}
}
-
+
protected String getMethod() {
return new String("GET");
}
-
+
protected String getRequestHostName() {
String hostName = "";
try {
hostName = InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException uhe) {
-
+
}
return hostName;
}
-
+
protected String getRequestHostAddress() {
String hostAddress = "";
try {
hostAddress =
InetAddress.getLocalHost().getHostAddress();
} catch (UnknownHostException uhe) {
-
+
}
return hostAddress;
}
-
+
protected String getRequestSoftware() {
return conf.get("http.agent.version", "");
}
-
+
protected String getRequestRobots() {
return new String("CLASSIC");
}
-
+
protected String getRequestContactName() {
return conf.get("http.agent.name", "");
}
-
+
protected String getRequestContactEmail() {
return conf.get("http.agent.email", "");
}
-
+
protected String getRequestAccept() {
return conf.get("http.accept", "");
}
-
+
protected String getRequestAcceptEncoding() {
return new String(""); // TODO
}
-
+
protected String getRequestAcceptLanguage() {
return conf.get("http.accept.language", "");
}
-
+
protected String getRequestUserAgent() {
return conf.get("http.robots.agents", "");
}
-
+
protected String getResponseStatus() {
return ifNullString(metadata.get("status"));
}
-
+
protected String getResponseHostName() {
return URLUtil.getHost(url);
}
-
+
protected String getResponseAddress() {
return ifNullString(metadata.get("_ip_"));
}
-
+
protected String getResponseContentEncoding() {
return ifNullString(metadata.get("Content-Encoding"));
}
-
+
protected String getResponseContentType() {
return ifNullString(metadata.get("Content-Type"));
}
-
+
protected String getResponseDate() {
if (this.simpleDateFormat) {
String timestamp = null;
try {
- long epoch = new SimpleDateFormat("EEE, d MMM
yyyy HH:mm:ss z").parse(ifNullString(metadata.get("Date"))).getTime();
+ long epoch = new SimpleDateFormat("EEE, dd MMM
yyyy HH:mm:ss z").parse(ifNullString(metadata.get("Date"))).getTime();
timestamp = String.valueOf(epoch);
} catch (ParseException pe) {
LOG.warn(pe.getMessage());
@@ -274,15 +303,15 @@ public abstract class AbstractCommonCraw
return ifNullString(metadata.get("Date"));
}
}
-
+
protected String getResponseServer() {
return ifNullString(metadata.get("Server"));
}
-
+
protected String getResponseContent() {
- return new String(content);
+ return new String(content.getContent());
}
-
+
protected String getKey() {
if (this.reverseKey) {
return this.reverseKeyValue;
@@ -291,7 +320,7 @@ public abstract class AbstractCommonCraw
return url;
}
}
-
+
protected String getImported() {
if (this.simpleDateFormat) {
String timestamp = null;
@@ -306,11 +335,11 @@ public abstract class AbstractCommonCraw
return ifNullString(metadata.get("Date"));
}
}
-
+
private static String ifNullString(String value) {
return (value != null) ? value : "";
}
-
+
private void startHeaders(String key, boolean nested, boolean newline)
throws IOException {
if (this.jsonArray) {
startArray(key, nested, newline);
@@ -319,7 +348,7 @@ public abstract class AbstractCommonCraw
startObject(key);
}
}
-
+
private void closeHeaders(String key, boolean nested, boolean newline)
throws IOException {
if (this.jsonArray) {
closeArray(key, nested, newline);
@@ -328,7 +357,7 @@ public abstract class AbstractCommonCraw
closeObject(key);
}
}
-
+
private void writeKeyValueWrapper(String key, String value) throws
IOException {
if (this.jsonArray) {
startArray(null, true, false);
@@ -340,4 +369,7 @@ public abstract class AbstractCommonCraw
writeKeyValue(key, value);
}
}
+
+ @Override
+ public void close() {}
}
Modified: nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlConfig.java
URL:
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlConfig.java?rev=1704594&r1=1704593&r2=1704594&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlConfig.java
(original)
+++ nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlConfig.java Tue Sep
22 12:23:31 2015
@@ -39,6 +39,12 @@ public class CommonCrawlConfig implement
private boolean reverseKey = false;
private String reverseKeyValue = "";
+
+ private boolean compressed = false;
+
+ private long warcSize = 0;
+
+ private String outputDir;
/**
* Default constructor
@@ -114,4 +120,28 @@ public class CommonCrawlConfig implement
public String getReverseKeyValue() {
return this.reverseKeyValue;
}
+
+ public boolean isCompressed() {
+ return compressed;
+ }
+
+ public void setCompressed(boolean compressed) {
+ this.compressed = compressed;
+ }
+
+ public long getWarcSize() {
+ return warcSize;
+ }
+
+ public void setWarcSize(long warcSize) {
+ this.warcSize = warcSize;
+ }
+
+ public String getOutputDir() {
+ return outputDir;
+ }
+
+ public void setOutputDir(String outputDir) {
+ this.outputDir = outputDir;
+ }
}
Modified: nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlDataDumper.java
URL:
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlDataDumper.java?rev=1704594&r1=1704593&r2=1704594&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlDataDumper.java
(original)
+++ nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlDataDumper.java Tue
Sep 22 12:23:31 2015
@@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,6 +18,7 @@
package org.apache.nutch.tools;
//JDK imports
+
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -51,13 +52,17 @@ import org.apache.commons.io.FilenameUti
//Hadoop
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
import org.apache.nutch.metadata.Metadata;
import org.apache.nutch.protocol.Content;
+import org.apache.nutch.segment.SegmentReader;
import org.apache.nutch.util.DumpFileUtil;
import org.apache.nutch.util.NutchConfiguration;
//Tika imports
@@ -90,11 +95,11 @@ import com.ibm.icu.text.SimpleDateFormat
* <p>
* Thus, the timestamped url key for the record is provided below followed by
an
* example record:
- *
+ * <p/>
* <pre>
* {@code
* com_somepage_33a3e36bbef59c2a5242c2ccee59239ab30d51f3_1411623696000
- *
+ *
* {
* "url": "http:\/\/somepage.com\/22\/14560817",
* "timestamp": "1411623696000",
@@ -133,23 +138,23 @@ import com.ibm.icu.text.SimpleDateFormat
* "Server": "nginx",
* "...": "..."
* },
- * "body": "\r\n <!DOCTYPE html PUBLIC ... \r\n\r\n \r\n
</body>\r\n </html>\r\n \r\n\r\n",
+ * "body": "\r\n <!DOCTYPE html PUBLIC ... \r\n\r\n \r\n
</body>\r\n </html>\r\n \r\n\r\n",
* },
* "key":
"com_somepage_33a3e36bbef59c2a5242c2ccee59239ab30d51f3_1411623696000",
* "imported": "1411623698000"
* }
* }
* </pre>
- *
+ * <p/>
* <p>
* Upon successful completion the tool displays a very convenient JSON snippet
* detailing the mimetype classifications and the counts of documents which
fall
* into those classifications. An example is as follows:
* </p>
- *
+ * <p/>
* <pre>
* {@code
- * INFO: File Types:
+ * INFO: File Types:
* TOTAL Stats: {
* {"mimeType":"application/xml","count":19"}
* {"mimeType":"image/png","count":47"}
@@ -164,465 +169,522 @@ import com.ibm.icu.text.SimpleDateFormat
* }
* }
* </pre>
- *
*/
-public class CommonCrawlDataDumper {
+public class CommonCrawlDataDumper extends Configured implements Tool {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(CommonCrawlDataDumper.class.getName());
+
+ private CommonCrawlConfig config = null;
+
+ // Gzip initialization
+ private FileOutputStream fileOutput = null;
+ private BufferedOutputStream bufOutput = null;
+ private GzipCompressorOutputStream gzipOutput = null;
+ private TarArchiveOutputStream tarOutput = null;
+ private ArrayList<String> fileList = null;
+
+ /**
+ * Main method for invoking this tool
+ *
+ * @param args 1) output directory (which will be created if it does not
+ * already exist) to host the CBOR data and 2) a directory
+ * containing one or more segments from which we wish to generate
+ * CBOR data from. Optionally, 3) a list of mimetypes and the 4)
+ * the gzip option may be provided.
+ * @throws Exception
+ */
+ @SuppressWarnings("static-access")
+ public static void main(String[] args) throws Exception {
+ Configuration conf = NutchConfiguration.create();
+ int res = ToolRunner.run(conf, new CommonCrawlDataDumper(), args);
+ System.exit(res);
+ }
+
+ /**
+ * Constructor
+ */
+ public CommonCrawlDataDumper(CommonCrawlConfig config) {
+ this.config = config;
+ }
+
+ public CommonCrawlDataDumper() {
+ }
+
+ /**
+ * Dumps the reverse engineered CBOR content from the provided segment
+ * directories if a parent directory contains more than one segment,
+ * otherwise a single segment can be passed as an argument. If the boolean
+ * argument is provided then the CBOR is also zipped.
+ *
+ * @param outputDir the directory you wish to dump the raw content to.
This
+ * directory will be created.
+ * @param segmentRootDir a directory containing one or more segments.
+ * @param gzip a boolean flag indicating whether the CBOR content
should also
+ * be gzipped.
+ * @param epochFilename if {@code true}, output files will be names using
the epoch time (in milliseconds).
+ * @param extension a file extension to use with output documents.
+ * @throws Exception if any exception occurs.
+ */
+ public void dump(File outputDir, File segmentRootDir, boolean gzip,
+ String[] mimeTypes, boolean epochFilename, String extension, boolean
warc)
+ throws Exception {
+ if (gzip) {
+ LOG.info("Gzipping CBOR data has been skipped");
+ }
+ // total file counts
+ Map<String, Integer> typeCounts = new HashMap<String, Integer>();
+ // filtered file counters
+ Map<String, Integer> filteredCounts = new HashMap<String, Integer>();
+
+ Configuration nutchConfig = NutchConfiguration.create();
+ FileSystem fs = FileSystem.get(nutchConfig);
+ File[] segmentDirs = segmentRootDir.listFiles(new FileFilter() {
+ @Override
+ public boolean accept(File file) {
+ return file.canRead() && file.isDirectory();
+ }
+ });
+
+ if (new File(
+ segmentRootDir.getAbsolutePath() + File.separator + Content.DIR_NAME
+ + "/part-00000/data").exists()) {
+ segmentDirs = new File[] { segmentRootDir };
+ }
+
+ if (segmentDirs == null) {
+ LOG.error(
+ "No segment directories found in [" +
segmentRootDir.getAbsolutePath()
+ + "]");
+ System.exit(1);
+ }
+
+ if (gzip && !warc) {
+ fileList = new ArrayList<String>();
+ constructNewStream(outputDir);
+ }
+
+ CommonCrawlFormat format = CommonCrawlFormatFactory
+ .getCommonCrawlFormat("JACKSON", nutchConfig, config);
+
+ if (warc) {
+ format = CommonCrawlFormatFactory
+ .getCommonCrawlFormat("WARC", nutchConfig, config);
+ }
+
+ for (File segment : segmentDirs) {
+ LOG.info("Processing segment: [" + segment.getAbsolutePath() + "]");
+ try {
+ String segmentContentPath =
+ segment.getAbsolutePath() + File.separator + Content.DIR_NAME
+ + "/part-00000/data";
+ Path file = new Path(segmentContentPath);
+
+ if (!new File(file.toString()).exists()) {
+ LOG.warn("Skipping segment: [" + segmentContentPath
+ + "]: no data directory present");
+ continue;
+ }
+ SequenceFile.Reader reader = new SequenceFile.Reader(nutchConfig,
+ SequenceFile.Reader.file(file));
+
+ if (!new File(file.toString()).exists()) {
+ LOG.warn("Skipping segment: [" + segmentContentPath
+ + "]: no data directory present");
+ continue;
+ }
+
+ Writable key = (Writable) reader.getKeyClass().newInstance();
+
+ Content content = null;
+
+ while (reader.next(key)) {
+ content = new Content();
+ reader.getCurrentValue(content);
+ Metadata metadata = content.getMetadata();
+ String url = key.toString();
+
+ String baseName = FilenameUtils.getBaseName(url);
+ String extensionName = FilenameUtils.getExtension(url);
+
+ if (!extension.isEmpty()) {
+ extensionName = extension;
+ } else if ((extensionName == null) || extensionName.isEmpty()) {
+ extensionName = "html";
+ }
+
+ String outputFullPath = null;
+ String outputRelativePath = null;
+ String filename = null;
+ String timestamp = null;
+ String reverseKey = null;
+
+ if (epochFilename || config.getReverseKey()) {
+ try {
+ long epoch = new SimpleDateFormat("EEE, d MMM yyyy HH:mm:ss z")
+ .parse(getDate(metadata.get("Date"))).getTime();
+ timestamp = String.valueOf(epoch);
+ } catch (ParseException pe) {
+ LOG.warn(pe.getMessage());
+ }
+
+ reverseKey = reverseUrl(url);
+ config.setReverseKeyValue(
+ reverseKey.replace("/", "_") + "_" + DigestUtils.shaHex(url)
+ + "_" + timestamp);
+ }
+
+ if (!warc) {
+ if (epochFilename) {
+ outputFullPath = DumpFileUtil
+ .createFileNameFromUrl(outputDir.getAbsolutePath(),
+ reverseKey, url, timestamp, extensionName, !gzip);
+ outputRelativePath = outputFullPath
+ .substring(0, outputFullPath.lastIndexOf(File.separator) -
1);
+ filename = content.getMetadata().get(Metadata.DATE) + "."
+ + extensionName;
+ } else {
+ String md5Ofurl = DumpFileUtil.getUrlMD5(url);
+ String fullDir = DumpFileUtil
+ .createTwoLevelsDirectory(outputDir.getAbsolutePath(),
+ md5Ofurl, !gzip);
+ filename = DumpFileUtil
+ .createFileName(md5Ofurl, baseName, extensionName);
+ outputFullPath = String.format("%s/%s", fullDir, filename);
+
+ String[] fullPathLevels = fullDir.split(File.separator);
+ String firstLevelDirName = fullPathLevels[fullPathLevels.length
+ - 2];
+ String secondLevelDirName = fullPathLevels[fullPathLevels.length
+ - 1];
+ outputRelativePath = firstLevelDirName + secondLevelDirName;
+ }
+ }
+ // Encode all filetypes if no mimetypes have been given
+ Boolean filter = (mimeTypes == null);
+
+ String jsonData = "";
+ try {
+ String mimeType = new Tika().detect(content.getContent());
+ // Maps file to JSON-based structure
+
+ jsonData = format.getJsonData(url, content, metadata);
+
+ collectStats(typeCounts, mimeType);
+ // collects statistics for the given mimetypes
+ if ((mimeType != null) && (mimeTypes != null) && Arrays
+ .asList(mimeTypes).contains(mimeType)) {
+ collectStats(filteredCounts, mimeType);
+ filter = true;
+ }
+ } catch (IOException ioe) {
+ LOG.error("Fatal error in creating JSON data: " +
ioe.getMessage());
+ return;
+ }
+
+ if (!warc) {
+ if (filter) {
+ byte[] byteData = serializeCBORData(jsonData);
+
+ if (!gzip) {
+ File outputFile = new File(outputFullPath);
+ if (outputFile.exists()) {
+ LOG.info("Skipping writing: [" + outputFullPath
+ + "]: file already exists");
+ } else {
+ LOG.info("Writing: [" + outputFullPath + "]");
+ IOUtils.copy(new ByteArrayInputStream(byteData),
+ new FileOutputStream(outputFile));
+ }
+ } else {
+ if (fileList.contains(outputFullPath)) {
+ LOG.info("Skipping compressing: [" + outputFullPath
+ + "]: file already exists");
+ } else {
+ fileList.add(outputFullPath);
+ LOG.info("Compressing: [" + outputFullPath + "]");
+ //TarArchiveEntry tarEntry = new
TarArchiveEntry(firstLevelDirName + File.separator + secondLevelDirName +
File.separator + filename);
+ TarArchiveEntry tarEntry = new TarArchiveEntry(
+ outputRelativePath + File.separator + filename);
+ tarEntry.setSize(byteData.length);
+ tarOutput.putArchiveEntry(tarEntry);
+ tarOutput.write(byteData);
+ tarOutput.closeArchiveEntry();
+ }
+ }
+ }
+ }
+ }
+
+ reader.close();
+ } finally {
+ fs.close();
+ }
+ }
+
+ // close the format if needed
+ format.close();
+
+ if (gzip && !warc) {
+ closeStream();
+ }
+
+ if (!typeCounts.isEmpty()) {
+ LOG.info("CommonsCrawlDataDumper File Stats: " + DumpFileUtil
+ .displayFileTypes(typeCounts, filteredCounts));
+ }
+
+ }
+
+ private void closeStream() {
+ try {
+ tarOutput.finish();
+
+ tarOutput.close();
+ gzipOutput.close();
+ bufOutput.close();
+ fileOutput.close();
+ } catch (IOException ioe) {
+ LOG.warn("Error in closing stream: " + ioe.getMessage());
+ }
+ }
+
+ private void constructNewStream(File outputDir) throws IOException {
+ String archiveName = new SimpleDateFormat("yyyyMMddhhmm'.tar.gz'")
+ .format(new Date());
+ LOG.info("Creating a new gzip archive: " + archiveName);
+ fileOutput = new FileOutputStream(
+ new File(outputDir + File.separator + archiveName));
+ bufOutput = new BufferedOutputStream(fileOutput);
+ gzipOutput = new GzipCompressorOutputStream(bufOutput);
+ tarOutput = new TarArchiveOutputStream(gzipOutput);
+ tarOutput.setLongFileMode(TarArchiveOutputStream.LONGFILE_GNU);
+ }
+
+ /**
+ * Writes the CBOR "Self-Describe Tag" (value 55799, serialized as 3-byte
+ * sequence of {@code 0xd9d9f7}) at the current position. This method must
+ * be used to write the CBOR magic number at the beginning of the document.
+ * Since version 2.5, <a
+ * href="https://github.com/FasterXML/jackson-dataformat-cbor"
+ * >jackson-dataformat-cbor</a> will support the {@code WRITE_TYPE_HEADER}
+ * feature to write that type tag at the beginning of the document.
+ *
+ * @param generator {@link CBORGenerator} object used to create a
CBOR-encoded document.
+ * @throws IOException if any I/O error occurs.
+ * @see <a href="https://tools.ietf.org/html/rfc7049#section-2.4.5">RFC
+ * 7049</a>
+ */
+ private void writeMagicHeader(CBORGenerator generator) throws IOException {
+ // Writes self-describe CBOR
+ // https://tools.ietf.org/html/rfc7049#section-2.4.5
+ // It will be supported in jackson-cbor since 2.5
+ byte[] header = new byte[3];
+ header[0] = (byte) 0xd9;
+ header[1] = (byte) 0xd9;
+ header[2] = (byte) 0xf7;
+ generator.writeBytes(header, 0, header.length);
+ }
+
+ private byte[] serializeCBORData(String jsonData) {
+ CBORFactory factory = new CBORFactory();
+
+ CBORGenerator generator = null;
+ ByteArrayOutputStream stream = null;
+
+ try {
+ stream = new ByteArrayOutputStream();
+ generator = factory.createGenerator(stream);
+ // Writes CBOR tag
+ writeMagicHeader(generator);
+ generator.writeString(jsonData);
+ generator.flush();
+ stream.flush();
+
+ return stream.toByteArray();
+
+ } catch (Exception e) {
+ LOG.warn("CBOR encoding failed: " + e.getMessage());
+ } finally {
+ try {
+ generator.close();
+ stream.close();
+ } catch (IOException e) {
+ // nothing to do
+ }
+ }
+
+ return null;
+ }
+
+ private void collectStats(Map<String, Integer> typeCounts, String mimeType) {
+ typeCounts.put(mimeType,
+ typeCounts.containsKey(mimeType) ? typeCounts.get(mimeType) + 1 : 1);
+ }
+
+ /**
+ * Gets the current date if the given timestamp is empty or null.
+ *
+ * @param timestamp the timestamp
+ * @return the current timestamp if the given one is null.
+ */
+ private String getDate(String timestamp) {
+ if (timestamp == null || timestamp.isEmpty()) {
+ DateFormat dateFormat = new SimpleDateFormat(
+ "EEE, d MMM yyyy HH:mm:ss z");
+ timestamp = dateFormat.format(new Date());
+ }
+ return timestamp;
+
+ }
+
+ public static String reverseUrl(String urlString) {
+ URL url;
+ String reverseKey = null;
+ try {
+ url = new URL(urlString);
+
+ String[] hostPart = url.getHost().replace('.', '/').split("/");
+
+ StringBuilder sb = new StringBuilder();
+ sb.append(hostPart[hostPart.length - 1]);
+ for (int i = hostPart.length - 2; i >= 0; i--) {
+ sb.append("/" + hostPart[i]);
+ }
- private static final Logger LOG =
LoggerFactory.getLogger(CommonCrawlDataDumper.class.getName());
-
- private CommonCrawlConfig config = null;
-
- // Gzip initialization
- private FileOutputStream fileOutput = null;
- private BufferedOutputStream bufOutput = null;
- private GzipCompressorOutputStream gzipOutput = null;
- private TarArchiveOutputStream tarOutput = null;
- private ArrayList<String> fileList = null;
-
- /**
- * Main method for invoking this tool
- *
- * @param args
- * 1) output directory (which will be created if it does not
- * already exist) to host the CBOR data and 2) a directory
- * containing one or more segments from which we wish to
generate
- * CBOR data from. Optionally, 3) a list of mimetypes and
the 4)
- * the gzip option may be provided.
- * @throws Exception
- */
- @SuppressWarnings("static-access")
- public static void main(String[] args) throws Exception {
- Option helpOpt = new Option("h", "help", false,
- "show this help message.");
- // argument options
- Option outputOpt = OptionBuilder
- .withArgName("outputDir")
- .hasArg()
- .withDescription(
- "output directory (which will
be created) to host the CBOR data.")
- .create("outputDir");
- Option segOpt = OptionBuilder
- .withArgName("segment")
- .hasArgs()
- .withDescription("the segment(s) to use")
- .create("segment");
- // create mimetype and gzip options
- Option mimeOpt = OptionBuilder
- .isRequired(false)
- .withArgName("mimetype")
- .hasArgs()
- .withDescription(
- "an optional list of mimetypes
to dump, excluding all others. Defaults to all.")
- .create("mimetype");
- Option gzipOpt = OptionBuilder
- .withArgName("gzip")
- .hasArg(false)
- .withDescription(
- "an optional flag indicating
whether to additionally gzip the data.")
- .create("gzip");
- Option keyPrefixOpt = OptionBuilder
- .withArgName("keyPrefix")
- .hasArg(true)
- .withDescription("an optional prefix for key in
the output format.")
- .create("keyPrefix");
- Option simpleDateFormatOpt = OptionBuilder
- .withArgName("SimpleDateFormat")
- .hasArg(false)
- .withDescription("an optional format for
timestamp in GMT epoch milliseconds.")
- .create("SimpleDateFormat");
- Option epochFilenameOpt = OptionBuilder
- .withArgName("epochFilename")
- .hasArg(false)
- .withDescription("an optional format for output
filename.")
- .create("epochFilename");
- Option jsonArrayOpt = OptionBuilder
- .withArgName("jsonArray")
- .hasArg(false)
- .withDescription("an optional format for JSON
output.")
- .create("jsonArray");
- Option reverseKeyOpt = OptionBuilder
- .withArgName("reverseKey")
- .hasArg(false)
- .withDescription("an optional format for key
value in JSON output.")
- .create("reverseKey");
- Option extensionOpt = OptionBuilder
- .withArgName("extension")
- .hasArg(true)
- .withDescription("an optional file extension
for output documents.")
- .create("extension");
-
- // create the options
- Options options = new Options();
- options.addOption(helpOpt);
- options.addOption(outputOpt);
- options.addOption(segOpt);
- // create mimetypes and gzip options
- options.addOption(mimeOpt);
- options.addOption(gzipOpt);
- // create keyPrefix option
- options.addOption(keyPrefixOpt);
- // create simpleDataFormat option
- options.addOption(simpleDateFormatOpt);
- options.addOption(epochFilenameOpt);
- options.addOption(jsonArrayOpt);
- options.addOption(reverseKeyOpt);
- options.addOption(extensionOpt);
-
- CommandLineParser parser = new GnuParser();
- try {
- CommandLine line = parser.parse(options, args);
- if (line.hasOption("help") ||
!line.hasOption("outputDir") || (!line.hasOption("segment"))) {
- HelpFormatter formatter = new HelpFormatter();
-
formatter.printHelp(CommonCrawlDataDumper.class.getName(), options, true);
- return;
- }
-
- File outputDir = new
File(line.getOptionValue("outputDir"));
- File segmentRootDir = new
File(line.getOptionValue("segment"));
- String[] mimeTypes = line.getOptionValues("mimetype");
- boolean gzip = line.hasOption("gzip");
- boolean epochFilename = line.hasOption("epochFilename");
-
- String keyPrefix = line.getOptionValue("keyPrefix", "");
- boolean simpleDateFormat =
line.hasOption("SimpleDateFormat");
- boolean jsonArray = line.hasOption("jsonArray");
- boolean reverseKey = line.hasOption("reverseKey");
- String extension = line.getOptionValue("extension", "");
-
- CommonCrawlConfig config = new CommonCrawlConfig();
- config.setKeyPrefix(keyPrefix);
- config.setSimpleDateFormat(simpleDateFormat);
- config.setJsonArray(jsonArray);
- config.setReverseKey(reverseKey);
-
- if (!outputDir.exists()) {
- LOG.warn("Output directory: [" +
outputDir.getAbsolutePath() + "]: does not exist, creating it.");
- if (!outputDir.mkdirs())
- throw new Exception("Unable to create:
[" + outputDir.getAbsolutePath() + "]");
- }
-
- CommonCrawlDataDumper dumper = new
CommonCrawlDataDumper(config);
-
- dumper.dump(outputDir, segmentRootDir, gzip, mimeTypes,
epochFilename, extension);
-
- } catch (Exception e) {
- LOG.error(CommonCrawlDataDumper.class.getName() + ": "
+ StringUtils.stringifyException(e));
- e.printStackTrace();
- return;
- }
- }
-
- /**
- * Constructor
- */
- public CommonCrawlDataDumper(CommonCrawlConfig config) {
- this.config = config;
- }
-
- /**
- * Dumps the reverse engineered CBOR content from the provided segment
- * directories if a parent directory contains more than one segment,
- * otherwise a single segment can be passed as an argument. If the
boolean
- * argument is provided then the CBOR is also zipped.
- *
- * @param outputDir
- * the directory you wish to dump the raw content to. This
- * directory will be created.
- * @param segmentRootDir
- * a directory containing one or more segments.
- * @param gzip
- * a boolean flag indicating whether the CBOR content should
also
- * be gzipped.
- * @param mimetypes
- * an array of mime types we have to dump, all others will be
- * filtered out.
- * @param epochFilename
- * if {@code true}, output files will be names using the epoch
time (in milliseconds).
- * @param extension
- * a file extension to use with output documents.
- * @throws Exception if any exception occurs.
- */
- public void dump(File outputDir, File segmentRootDir, boolean gzip,
String[] mimeTypes, boolean epochFilename, String extension) throws Exception {
- if (gzip) {
- LOG.info("Gzipping CBOR data has been skipped");
- }
- // total file counts
- Map<String, Integer> typeCounts = new HashMap<String,
Integer>();
- // filtered file counters
- Map<String, Integer> filteredCounts = new HashMap<String,
Integer>();
-
- Configuration nutchConfig = NutchConfiguration.create();
- FileSystem fs = FileSystem.get(nutchConfig);
- File[] segmentDirs = segmentRootDir.listFiles(new FileFilter() {
- @Override
- public boolean accept(File file) {
- return file.canRead() && file.isDirectory();
- }
- });
-
- if (segmentDirs == null) {
- LOG.error("No segment directories found in [" +
segmentRootDir.getAbsolutePath() + "]");
- System.exit(1);
- }
-
- if (gzip) {
- fileList = new ArrayList<String>();
- constructNewStream(outputDir);
- }
-
- for (File segment : segmentDirs) {
- LOG.info("Processing segment: [" +
segment.getAbsolutePath() + "]");
- try {
- String segmentContentPath =
segment.getAbsolutePath() + File.separator + Content.DIR_NAME +
"/part-00000/data";
- Path file = new Path(segmentContentPath);
-
- if (!new File(file.toString()).exists()) {
- LOG.warn("Skipping segment: [" +
segmentContentPath + "]: no data directory present");
- continue;
- }
- SequenceFile.Reader reader = new
SequenceFile.Reader(nutchConfig, SequenceFile.Reader.file(file));
-
- if (!new File(file.toString()).exists()) {
- LOG.warn("Skipping segment: [" +
segmentContentPath + "]: no data directory present");
- continue;
- }
- Writable key = (Writable)
reader.getKeyClass().newInstance();
-
- Content content = null;
-
- while (reader.next(key)) {
- content = new Content();
- reader.getCurrentValue(content);
- Metadata metadata =
content.getMetadata();
- String url = key.toString();
- String baseName =
FilenameUtils.getBaseName(url);
- String extensionName =
FilenameUtils.getExtension(url);
-
- if (!extension.isEmpty()) {
- extensionName = extension;
- }
- else if ((extensionName == null) ||
extensionName.isEmpty()) {
- extensionName = "html";
- }
-
- String outputFullPath = null;
- String outputRelativePath = null;
- String filename = null;
- String timestamp = null;
- String reverseKey = null;
-
- if (epochFilename ||
config.getReverseKey()) {
- try {
- long epoch = new
SimpleDateFormat("EEE, d MMM yyyy HH:mm:ss
z").parse(getDate(metadata.get("Date"))).getTime();
- timestamp =
String.valueOf(epoch);
- } catch (ParseException pe) {
-
LOG.warn(pe.getMessage());
- }
-
- reverseKey = reverseUrl(url);
-
config.setReverseKeyValue(reverseKey.replace("/", "_") + "_" +
DigestUtils.shaHex(url) + "_" + timestamp);
- }
-
- if (epochFilename) {
- outputFullPath =
DumpFileUtil.createFileNameFromUrl(outputDir.getAbsolutePath(), reverseKey,
url, timestamp, extensionName, !gzip);
- outputRelativePath =
outputFullPath.substring(0, outputFullPath.lastIndexOf(File.separator)-1);
- filename =
content.getMetadata().get(Metadata.DATE) + "." + extensionName;
- }
- else {
- String md5Ofurl =
DumpFileUtil.getUrlMD5(url);
- String fullDir =
DumpFileUtil.createTwoLevelsDirectory(outputDir.getAbsolutePath(), md5Ofurl,
!gzip);
- filename =
DumpFileUtil.createFileName(md5Ofurl, baseName, extensionName);
- outputFullPath =
String.format("%s/%s", fullDir, filename);
-
- String [] fullPathLevels =
fullDir.split(File.separator);
- String firstLevelDirName =
fullPathLevels[fullPathLevels.length-2];
- String secondLevelDirName =
fullPathLevels[fullPathLevels.length-1];
- outputRelativePath =
firstLevelDirName + secondLevelDirName;
- }
-
- // Encode all filetypes if no mimetypes
have been given
- Boolean filter = (mimeTypes == null);
-
- String jsonData = "";
- try {
- String mimeType = new
Tika().detect(content.getContent());
- // Maps file to JSON-based
structure
- CommonCrawlFormat format =
CommonCrawlFormatFactory.getCommonCrawlFormat("JACKSON", url,
content.getContent(), metadata, nutchConfig, config);
- jsonData = format.getJsonData();
-
- collectStats(typeCounts,
mimeType);
- // collects statistics for the
given mimetypes
- if ((mimeType != null) &&
(mimeTypes != null) && Arrays.asList(mimeTypes).contains(mimeType)) {
-
collectStats(filteredCounts, mimeType);
- filter = true;
- }
- } catch (IOException ioe) {
- LOG.error("Fatal error in
creating JSON data: " + ioe.getMessage());
- return;
- }
-
- if (filter) {
- byte[] byteData =
serializeCBORData(jsonData);
-
- if (!gzip) {
- File outputFile = new
File(outputFullPath);
- if
(outputFile.exists()) {
-
LOG.info("Skipping writing: [" + outputFullPath + "]: file already exists");
- }
- else {
-
LOG.info("Writing: [" + outputFullPath + "]");
-
IOUtils.copy(new ByteArrayInputStream(byteData), new
FileOutputStream(outputFile));
- }
- }
- else {
- if
(fileList.contains(outputFullPath)) {
-
LOG.info("Skipping compressing: [" + outputFullPath + "]: file already exists");
- }
- else {
-
fileList.add(outputFullPath);
-
LOG.info("Compressing: [" + outputFullPath + "]");
-
//TarArchiveEntry tarEntry = new TarArchiveEntry(firstLevelDirName +
File.separator + secondLevelDirName + File.separator + filename);
- TarArchiveEntry
tarEntry = new TarArchiveEntry(outputRelativePath + File.separator + filename);
-
tarEntry.setSize(byteData.length);
-
tarOutput.putArchiveEntry(tarEntry);
-
tarOutput.write(byteData);
-
tarOutput.closeArchiveEntry();
- }
- }
- }
- }
- reader.close();
- } finally {
- fs.close();
- }
- }
-
- if (gzip) {
- closeStream();
- }
-
- if (!typeCounts.isEmpty()) {
- LOG.info("CommonsCrawlDataDumper File Stats: " +
DumpFileUtil.displayFileTypes(typeCounts, filteredCounts));
- }
- }
-
- private void closeStream() {
- try {
- tarOutput.finish();
-
- tarOutput.close();
- gzipOutput.close();
- bufOutput.close();
- fileOutput.close();
- } catch (IOException ioe) {
- LOG.warn("Error in closing stream: " +
ioe.getMessage());
- }
- }
-
- private void constructNewStream(File outputDir) throws IOException {
- String archiveName = new
SimpleDateFormat("yyyyMMddhhmm'.tar.gz'").format(new Date());
- LOG.info("Creating a new gzip archive: " + archiveName);
- fileOutput = new FileOutputStream(new File(outputDir +
File.separator + archiveName));
- bufOutput = new BufferedOutputStream(fileOutput);
- gzipOutput = new GzipCompressorOutputStream(bufOutput);
- tarOutput = new TarArchiveOutputStream(gzipOutput);
- tarOutput.setLongFileMode(TarArchiveOutputStream.LONGFILE_GNU);
- }
-
- /**
- * Writes the CBOR "Self-Describe Tag" (value 55799, serialized as
3-byte
- * sequence of {@code 0xd9d9f7}) at the current position. This method
must
- * be used to write the CBOR magic number at the beginning of the
document.
- * Since version 2.5, <a
- * href="https://github.com/FasterXML/jackson-dataformat-cbor"
- * >jackson-dataformat-cbor</a> will support the {@code
WRITE_TYPE_HEADER}
- * feature to write that type tag at the beginning of the document.
- *
- * @see <a href="https://tools.ietf.org/html/rfc7049#section-2.4.5">RFC
- * 7049</a>
- * @param generator {@link CBORGenerator} object used to create a
CBOR-encoded document.
- * @throws IOException if any I/O error occurs.
- */
- private void writeMagicHeader(CBORGenerator generator) throws
IOException {
- // Writes self-describe CBOR
- // https://tools.ietf.org/html/rfc7049#section-2.4.5
- // It will be supported in jackson-cbor since 2.5
- byte[] header = new byte[3];
- header[0] = (byte) 0xd9;
- header[1] = (byte) 0xd9;
- header[2] = (byte) 0xf7;
- generator.writeBytes(header, 0, header.length);
- }
-
- private byte[] serializeCBORData(String jsonData) {
- CBORFactory factory = new CBORFactory();
-
- CBORGenerator generator = null;
- ByteArrayOutputStream stream = null;
-
- try {
- stream = new ByteArrayOutputStream();
- generator = factory.createGenerator(stream);
- // Writes CBOR tag
- writeMagicHeader(generator);
- generator.writeString(jsonData);
- generator.flush();
- stream.flush();
-
- return stream.toByteArray();
-
- } catch (Exception e) {
- LOG.warn("CBOR encoding failed: " + e.getMessage());
- } finally {
- try {
- generator.close();
- stream.close();
- } catch (IOException e) {
- // nothing to do
- }
- }
-
- return null;
- }
-
- private void collectStats(Map<String, Integer> typeCounts, String
mimeType) {
- typeCounts.put(mimeType, typeCounts.containsKey(mimeType) ?
typeCounts.get(mimeType) + 1 : 1);
- }
-
- /**
- * Gets the current date if the given timestamp is empty or null.
- * @param timestamp the timestamp
- * @return the current timestamp if the given one is null.
- */
- private String getDate(String timestamp) {
- if (timestamp == null || timestamp.isEmpty()) {
- DateFormat dateFormat = new SimpleDateFormat("EEE, d
MMM yyyy HH:mm:ss z");
- timestamp = dateFormat.format(new Date());
- }
- return timestamp;
-
- }
-
- public static String reverseUrl(String urlString) {
- URL url = null;
- String reverseKey = null;
- try {
- url = new URL(urlString);
-
- String[] hostPart = url.getHost().replace('.',
'/').split("/");
-
- StringBuilder sb = new StringBuilder();
- sb.append(hostPart[hostPart.length-1]);
- for (int i = hostPart.length-2; i >= 0; i--) {
- sb.append("/" + hostPart[i]);
- }
-
- reverseKey = sb.toString();
-
- } catch (MalformedURLException e) {
- LOG.error("Failed to parse URL: {}", urlString);
- }
-
- return reverseKey;
+ reverseKey = sb.toString();
+
+ } catch (MalformedURLException e) {
+ LOG.error("Failed to parse URL: {}", urlString);
}
+
+ return reverseKey;
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Option helpOpt = new Option("h", "help", false, "show this help message.");
+ // argument options
+ Option outputOpt = OptionBuilder.withArgName("outputDir").hasArg()
+ .withDescription(
+ "output directory (which will be created) to host the CBOR data.")
+ .create("outputDir");
+ // WARC format
+ Option warcOpt = new Option("warc", "export to a WARC file");
+
+ Option segOpt = OptionBuilder.withArgName("segment").hasArgs()
+ .withDescription("the segment or directory containing segments to
use").create("segment");
+ // create mimetype and gzip options
+ Option mimeOpt = OptionBuilder.isRequired(false).withArgName("mimetype")
+ .hasArgs().withDescription(
+ "an optional list of mimetypes to dump, excluding all others.
Defaults to all.")
+ .create("mimetype");
+ Option gzipOpt = OptionBuilder.withArgName("gzip").hasArg(false)
+ .withDescription(
+ "an optional flag indicating whether to additionally gzip the
data.")
+ .create("gzip");
+ Option keyPrefixOpt = OptionBuilder.withArgName("keyPrefix").hasArg(true)
+ .withDescription("an optional prefix for key in the output format.")
+ .create("keyPrefix");
+ Option simpleDateFormatOpt = OptionBuilder.withArgName("SimpleDateFormat")
+ .hasArg(false).withDescription(
+ "an optional format for timestamp in GMT epoch milliseconds.")
+ .create("SimpleDateFormat");
+ Option epochFilenameOpt = OptionBuilder.withArgName("epochFilename")
+ .hasArg(false)
+ .withDescription("an optional format for output filename.")
+ .create("epochFilename");
+ Option jsonArrayOpt = OptionBuilder.withArgName("jsonArray").hasArg(false)
+ .withDescription("an optional format for JSON output.")
+ .create("jsonArray");
+ Option reverseKeyOpt =
OptionBuilder.withArgName("reverseKey").hasArg(false)
+ .withDescription("an optional format for key value in JSON output.")
+ .create("reverseKey");
+ Option extensionOpt = OptionBuilder.withArgName("extension").hasArg(true)
+ .withDescription("an optional file extension for output documents.")
+ .create("extension");
+ Option sizeOpt = OptionBuilder.withArgName("warcSize").hasArg(true)
+ .withType(Number.class)
+ .withDescription("an optional file size in bytes for the WARC file(s)")
+ .create("warcSize");
+
+ // create the options
+ Options options = new Options();
+ options.addOption(helpOpt);
+ options.addOption(outputOpt);
+ options.addOption(segOpt);
+ // create mimetypes and gzip options
+ options.addOption(warcOpt);
+ options.addOption(mimeOpt);
+ options.addOption(gzipOpt);
+ // create keyPrefix option
+ options.addOption(keyPrefixOpt);
+ // create simpleDataFormat option
+ options.addOption(simpleDateFormatOpt);
+ options.addOption(epochFilenameOpt);
+ options.addOption(jsonArrayOpt);
+ options.addOption(reverseKeyOpt);
+ options.addOption(extensionOpt);
+ options.addOption(sizeOpt);
+
+ CommandLineParser parser = new GnuParser();
+ try {
+ CommandLine line = parser.parse(options, args);
+ if (line.hasOption("help") || !line.hasOption("outputDir") || (!line
+ .hasOption("segment"))) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter
+ .printHelp(CommonCrawlDataDumper.class.getName(), options, true);
+ return 0;
+ }
+
+ File outputDir = new File(line.getOptionValue("outputDir"));
+ File segmentRootDir = new File(line.getOptionValue("segment"));
+ String[] mimeTypes = line.getOptionValues("mimetype");
+ boolean gzip = line.hasOption("gzip");
+ boolean epochFilename = line.hasOption("epochFilename");
+
+ String keyPrefix = line.getOptionValue("keyPrefix", "");
+ boolean simpleDateFormat = line.hasOption("SimpleDateFormat");
+ boolean jsonArray = line.hasOption("jsonArray");
+ boolean reverseKey = line.hasOption("reverseKey");
+ String extension = line.getOptionValue("extension", "");
+ boolean warc = line.hasOption("warc");
+ long warcSize = 0;
+
+ if (line.getParsedOptionValue("warcSize") != null) {
+ warcSize = (Long) line.getParsedOptionValue("warcSize");
+ }
+
+ CommonCrawlConfig config = new CommonCrawlConfig();
+ config.setKeyPrefix(keyPrefix);
+ config.setSimpleDateFormat(simpleDateFormat);
+ config.setJsonArray(jsonArray);
+ config.setReverseKey(reverseKey);
+ config.setCompressed(gzip);
+ config.setWarcSize(warcSize);
+ config.setOutputDir(line.getOptionValue("outputDir"));
+
+ if (!outputDir.exists()) {
+ LOG.warn("Output directory: [" + outputDir.getAbsolutePath()
+ + "]: does not exist, creating it.");
+ if (!outputDir.mkdirs())
+ throw new Exception(
+ "Unable to create: [" + outputDir.getAbsolutePath() + "]");
+ }
+
+ CommonCrawlDataDumper dumper = new CommonCrawlDataDumper(config);
+
+ dumper.dump(outputDir, segmentRootDir, gzip, mimeTypes, epochFilename,
+ extension, warc);
+
+ } catch (Exception e) {
+ LOG.error(CommonCrawlDataDumper.class.getName() + ": " + StringUtils
+ .stringifyException(e));
+ e.printStackTrace();
+ return -1;
+ }
+
+ return 0;
+ }
}
Modified: nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormat.java
URL:
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormat.java?rev=1704594&r1=1704593&r2=1704594&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormat.java
(original)
+++ nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormat.java Tue Sep
22 12:23:31 2015
@@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,22 +17,55 @@
package org.apache.nutch.tools;
+import org.apache.nutch.metadata.Metadata;
+import org.apache.nutch.parse.ParseData;
+import org.apache.nutch.protocol.Content;
+
import java.io.IOException;
/**
* Interface for all CommonCrawl formatter. It provides the signature for the
* method used to get JSON data.
- *
+ *
* @author gtotaro
*
*/
public interface CommonCrawlFormat {
- /**
- *
- * @param mapAll If {@code true} maps all metdata on the JSON structure.
- * @return the JSON data
- */
- //public String getJsonData(boolean mapAll) throws IOException;
- public String getJsonData() throws IOException;
+ /**
+ *
+ * @param mapAll If {@code true} maps all metdata on the JSON structure.
+ * @return the JSON data
+ */
+ //public String getJsonData(boolean mapAll) throws IOException;
+ public String getJsonData() throws IOException;
+
+ /**
+ * Returns a string representation of the JSON structure of the URL content
+ *
+ * @param url
+ * @param content
+ * @param metadata
+ * @return
+ */
+ public String getJsonData(String url, Content content, Metadata metadata)
+ throws IOException;
+
+ /**
+ * Returns a string representation of the JSON structure of the URL content
+ * takes into account the parsed metadata about the URL
+ *
+ * @param url
+ * @param content
+ * @param metadata
+ * @return
+ */
+ public String getJsonData(String url, Content content, Metadata metadata,
+ ParseData parseData) throws IOException;
+
+ /**
+ * Optional method that could be implemented if the actual format needs some
+ * close procedure.
+ */
+ public abstract void close();
}
Modified:
nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormatFactory.java
URL:
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormatFactory.java?rev=1704594&r1=1704593&r2=1704594&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormatFactory.java
(original)
+++ nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormatFactory.java
Tue Sep 22 12:23:31 2015
@@ -21,6 +21,7 @@ import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.nutch.metadata.Metadata;
+import org.apache.nutch.protocol.Content;
/**
* Factory class that creates new {@see CommonCrawlFormat} objects (a.k.a.
formatter) that map crawled files to CommonCrawl format.
@@ -38,8 +39,9 @@ public class CommonCrawlFormatFactory {
* @param config the CommonCrawl output configuration.
* @return the new {@see CommonCrawlFormat} object.
* @throws IOException If any I/O error occurs.
+ * @deprecated
*/
- public static CommonCrawlFormat getCommonCrawlFormat(String formatType,
String url, byte[] content, Metadata metadata, Configuration nutchConf,
CommonCrawlConfig config) throws IOException {
+ public static CommonCrawlFormat getCommonCrawlFormat(String formatType,
String url, Content content, Metadata metadata, Configuration nutchConf,
CommonCrawlConfig config) throws IOException {
if (formatType == null) {
return null;
}
@@ -56,4 +58,17 @@ public class CommonCrawlFormatFactory {
return null;
}
+
+ // The format should not depend on variable attributes, essentially this
+ // should be one for the full job
+ public static CommonCrawlFormat getCommonCrawlFormat(String formatType,
Configuration nutchConf, CommonCrawlConfig config) throws IOException {
+ if (formatType.equalsIgnoreCase("WARC")) {
+ return new CommonCrawlFormatWARC(nutchConf, config);
+ }
+
+ if (formatType.equalsIgnoreCase("JACKSON")) {
+ return new CommonCrawlFormatJackson( nutchConf, config);
+ }
+ return null;
+ }
}
Modified:
nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormatJackson.java
URL:
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormatJackson.java?rev=1704594&r1=1704593&r2=1704594&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormatJackson.java
(original)
+++ nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormatJackson.java
Tue Sep 22 12:23:31 2015
@@ -25,6 +25,7 @@ import org.apache.nutch.metadata.Metadat
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.nutch.protocol.Content;
/**
* This class provides methods to map crawled data on JSON using Jackson
Streaming APIs.
@@ -36,8 +37,18 @@ public class CommonCrawlFormatJackson ex
private JsonGenerator generator;
+ public CommonCrawlFormatJackson(Configuration nutchConf,
+ CommonCrawlConfig config) throws IOException {
+ super(null, null, null, nutchConf, config);
+
+ JsonFactory factory = new JsonFactory();
+ this.out = new ByteArrayOutputStream();
+ this.generator = factory.createGenerator(out);
+
+ this.generator.useDefaultPrettyPrinter(); // INDENTED OUTPUT
+ }
- public CommonCrawlFormatJackson(String url, byte[] content, Metadata
metadata, Configuration nutchConf, CommonCrawlConfig config) throws IOException
{
+ public CommonCrawlFormatJackson(String url, Content content, Metadata
metadata, Configuration nutchConf, CommonCrawlConfig config) throws IOException
{
super(url, content, metadata, nutchConf, config);
JsonFactory factory = new JsonFactory();
Modified:
nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormatJettinson.java
URL:
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormatJettinson.java?rev=1704594&r1=1704593&r2=1704594&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormatJettinson.java
(original)
+++ nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormatJettinson.java
Tue Sep 22 12:23:31 2015
@@ -23,6 +23,7 @@ import java.util.Deque;
import org.apache.hadoop.conf.Configuration;
import org.apache.nutch.metadata.Metadata;
+import org.apache.nutch.protocol.Content;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
@@ -37,7 +38,7 @@ public class CommonCrawlFormatJettinson
private Deque<JSONArray> stackArrays;
- public CommonCrawlFormatJettinson(String url, byte[] content, Metadata
metadata, Configuration nutchConf, CommonCrawlConfig config) throws IOException
{
+ public CommonCrawlFormatJettinson(String url, Content content, Metadata
metadata, Configuration nutchConf, CommonCrawlConfig config) throws IOException
{
super(url, content, metadata, nutchConf, config);
stackObjects = new ArrayDeque<JSONObject>();
Modified:
nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormatSimple.java
URL:
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormatSimple.java?rev=1704594&r1=1704593&r2=1704594&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormatSimple.java
(original)
+++ nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormatSimple.java
Tue Sep 22 12:23:31 2015
@@ -21,6 +21,7 @@ import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.nutch.metadata.Metadata;
+import org.apache.nutch.protocol.Content;
/**
* This class provides methods to map crawled data on JSON using a {@see
StringBuilder} object.
@@ -32,7 +33,7 @@ public class CommonCrawlFormatSimple ext
private int tabCount;
- public CommonCrawlFormatSimple(String url, byte[] content, Metadata
metadata, Configuration nutchConf, CommonCrawlConfig config) throws IOException
{
+ public CommonCrawlFormatSimple(String url, Content content, Metadata
metadata, Configuration nutchConf, CommonCrawlConfig config) throws IOException
{
super(url, content, metadata, nutchConf, config);
this.sb = new StringBuilder();
Added: nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormatWARC.java
URL:
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormatWARC.java?rev=1704594&view=auto
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormatWARC.java
(added)
+++ nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormatWARC.java Tue
Sep 22 12:23:31 2015
@@ -0,0 +1,286 @@
+package org.apache.nutch.tools;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.text.ParseException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.ibm.icu.text.SimpleDateFormat;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.metadata.Metadata;
+import org.apache.nutch.parse.ParseData;
+
+import org.apache.nutch.parse.ParseSegment;
+import org.apache.nutch.protocol.Content;
+import org.archive.format.warc.WARCConstants;
+import org.archive.io.WriterPoolMember;
+import org.archive.io.warc.WARCRecordInfo;
+import org.archive.io.warc.WARCWriter;
+import org.archive.io.warc.WARCWriterPoolSettingsData;
+import org.archive.uid.UUIDGenerator;
+import org.archive.util.DateUtils;
+import org.archive.util.anvl.ANVLRecord;
+
+public class CommonCrawlFormatWARC extends AbstractCommonCrawlFormat {
+
+ public static final String MAX_WARC_FILE_SIZE = "warc.file.size.max";
+ public static final String TEMPLATE = "${prefix}-${timestamp17}-${serialno}";
+
+ private static final AtomicInteger SERIALNO = new AtomicInteger();
+ private final static UUIDGenerator GENERATOR = new UUIDGenerator();
+
+ private String outputDir = null;
+ private ByteArrayOutputStream out;
+ private WARCWriter writer;
+ private ParseData parseData;
+
+ public CommonCrawlFormatWARC(Configuration nutchConf,
+ CommonCrawlConfig config) throws IOException {
+ super(null, null, null, nutchConf, config);
+
+ this.out = new ByteArrayOutputStream();
+
+ ANVLRecord info = WARCUtils.getWARCInfoContent(nutchConf);
+ List<String> md = Collections.singletonList(info.toString());
+
+ this.outputDir = config.getOutputDir();
+
+ if (null == outputDir) {
+ String message = "Missing output directory configuration: " + outputDir;
+
+ throw new RuntimeException(message);
+ }
+
+ File file = new File(outputDir);
+
+ long maxSize = WARCConstants.DEFAULT_MAX_WARC_FILE_SIZE;
+
+ if (config.getWarcSize() > 0) {
+ maxSize = config.getWarcSize();
+ }
+
+ WARCWriterPoolSettingsData settings = new WARCWriterPoolSettingsData(
+ WriterPoolMember.DEFAULT_PREFIX, TEMPLATE, maxSize,
+ config.isCompressed(), Arrays.asList(new File[] { file }), md,
+ new UUIDGenerator());
+
+ writer = new WARCWriter(SERIALNO, settings);
+ }
+
+ public CommonCrawlFormatWARC(String url, Content content, Metadata metadata,
+ Configuration nutchConf, CommonCrawlConfig config, ParseData parseData)
+ throws IOException {
+ super(url, content, metadata, nutchConf, config);
+
+ this.out = new ByteArrayOutputStream();
+ this.parseData = parseData;
+
+ ANVLRecord info = WARCUtils.getWARCInfoContent(conf);
+ List<String> md = Collections.singletonList(info.toString());
+
+ this.outputDir = config.getOutputDir();
+
+ if (null == outputDir) {
+ String message = "Missing output directory configuration: " + outputDir;
+
+ throw new RuntimeException(message);
+ }
+
+ File file = new File(outputDir);
+
+ long maxSize = WARCConstants.DEFAULT_MAX_WARC_FILE_SIZE;
+
+ if (config.getWarcSize() > 0) {
+ maxSize = config.getWarcSize();
+ }
+
+ WARCWriterPoolSettingsData settings = new WARCWriterPoolSettingsData(
+ WriterPoolMember.DEFAULT_PREFIX, TEMPLATE, maxSize,
+ config.isCompressed(), Arrays.asList(new File[] { file }), md,
+ new UUIDGenerator());
+
+ writer = new WARCWriter(SERIALNO, settings);
+ }
+
+ public String getJsonData(String url, Content content, Metadata metadata,
+ ParseData parseData) throws IOException {
+ this.url = url;
+ this.content = content;
+ this.metadata = metadata;
+ this.parseData = parseData;
+
+ return this.getJsonData();
+ }
+
+ @Override
+ public String getJsonData() throws IOException {
+
+ long position = writer.getPosition();
+
+ try {
+ // See if we need to open a new file because we've exceeded maxBytes
+
+ // checkSize will open a new file if we exceeded the maxBytes setting
+ writer.checkSize();
+
+ if (writer.getPosition() != position) {
+ // We just closed the file because it was larger than maxBytes.
+ position = writer.getPosition();
+ }
+
+ // response record
+ URI id = writeResponse();
+
+ if (StringUtils.isNotBlank(metadata.get("_request_"))) {
+ // write the request method if any request info is found
+ writeRequest(id);
+ }
+ } catch (IOException e) {
+ // Launch the corresponding IO error
+ throw e;
+ } catch (ParseException e) {
+ // do nothing, as we can't establish a valid WARC-Date for this record
+ // lets skip it altogether
+ LOG.error("Can't get a valid date from: {}", url);
+ }
+
+ return null;
+ }
+
+ protected URI writeResponse() throws IOException, ParseException {
+ WARCRecordInfo record = new WARCRecordInfo();
+
+ record.setType(WARCConstants.WARCRecordType.response);
+ record.setUrl(getUrl());
+
+ String fetchTime;
+
+ record.setCreate14DigitDate(DateUtils
+ .getLog14Date(Long.parseLong(metadata.get("nutch.fetch.time"))));
+ record.setMimetype(WARCConstants.HTTP_RESPONSE_MIMETYPE);
+ record.setRecordId(GENERATOR.getRecordID());
+
+ String IP = getResponseAddress();
+
+ if (StringUtils.isNotBlank(IP))
+ record.addExtraHeader(WARCConstants.HEADER_KEY_IP, IP);
+
+ if (ParseSegment.isTruncated(content))
+ record.addExtraHeader(WARCConstants.HEADER_KEY_TRUNCATED, "unspecified");
+
+ ByteArrayOutputStream output = new ByteArrayOutputStream();
+
+ String httpHeaders = metadata.get("_response.headers_");
+
+ if (StringUtils.isNotBlank(httpHeaders)) {
+ output.write(httpHeaders.getBytes());
+ } else {
+ // change the record type to resource as we not have information about
+ // the headers
+ record.setType(WARCConstants.WARCRecordType.resource);
+ record.setMimetype(content.getContentType());
+ }
+
+ output.write(getResponseContent().getBytes());
+
+ record.setContentLength(output.size());
+ record.setContentStream(new ByteArrayInputStream(output.toByteArray()));
+
+ if (output.size() > 0) {
+ // avoid generating a 0 sized record, as the webarchive library will
+ // complain about it
+ writer.writeRecord(record);
+ }
+
+ return record.getRecordId();
+ }
+
+ protected URI writeRequest(URI id) throws IOException, ParseException {
+ WARCRecordInfo record = new WARCRecordInfo();
+
+ record.setType(WARCConstants.WARCRecordType.request);
+ record.setUrl(getUrl());
+ record.setCreate14DigitDate(DateUtils
+ .getLog14Date(Long.parseLong(metadata.get("nutch.fetch.time"))));
+ record.setMimetype(WARCConstants.HTTP_REQUEST_MIMETYPE);
+ record.setRecordId(GENERATOR.getRecordID());
+
+ if (id != null) {
+ ANVLRecord headers = new ANVLRecord();
+ headers.addLabelValue(WARCConstants.HEADER_KEY_CONCURRENT_TO,
+ '<' + id.toString() + '>');
+ record.setExtraHeaders(headers);
+ }
+
+ ByteArrayOutputStream output = new ByteArrayOutputStream();
+
+ output.write(metadata.get("_request_").getBytes());
+ record.setContentLength(output.size());
+ record.setContentStream(new ByteArrayInputStream(output.toByteArray()));
+
+ writer.writeRecord(record);
+
+ return record.getRecordId();
+ }
+
+ @Override
+ protected String generateJson() throws IOException {
+ return null;
+ }
+
+ @Override
+ protected void writeKeyValue(String key, String value) throws IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ protected void writeKeyNull(String key) throws IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ protected void startArray(String key, boolean nested, boolean newline)
+ throws IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ protected void closeArray(String key, boolean nested, boolean newline)
+ throws IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ protected void writeArrayValue(String value) throws IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ protected void startObject(String key) throws IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ protected void closeObject(String key) throws IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public void close() {
+ if (writer != null)
+ try {
+ writer.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
Added: nutch/trunk/src/java/org/apache/nutch/tools/WARCUtils.java
URL:
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/tools/WARCUtils.java?rev=1704594&view=auto
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/tools/WARCUtils.java (added)
+++ nutch/trunk/src/java/org/apache/nutch/tools/WARCUtils.java Tue Sep 22
12:23:31 2015
@@ -0,0 +1,154 @@
+package org.apache.nutch.tools;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.indexer.NutchDocument;
+import org.apache.nutch.util.StringUtil;
+import org.archive.format.http.HttpHeaders;
+import org.archive.format.warc.WARCConstants;
+import org.archive.io.warc.WARCRecordInfo;
+import org.archive.uid.UUIDGenerator;
+import org.archive.util.DateUtils;
+import org.archive.util.anvl.ANVLRecord;
+
+public class WARCUtils {
+ public final static String SOFTWARE = "software";
+ public final static String HTTP_HEADER_FROM = "http-header-from";
+ public final static String HTTP_HEADER_USER_AGENT =
"http-header-user-agent";
+ public final static String HOSTNAME = "hostname";
+ public final static String ROBOTS = "robots";
+ public final static String OPERATOR = "operator";
+ public final static String FORMAT = "format";
+ public final static String CONFORMS_TO = "conformsTo";
+ public final static String IP = "ip";
+ public final static UUIDGenerator generator = new UUIDGenerator();
+
+ public static final ANVLRecord getWARCInfoContent(Configuration conf) {
+ ANVLRecord record = new ANVLRecord();
+
+ // informative headers
+ record.addLabelValue(FORMAT, "WARC File Format 1.0");
+ record.addLabelValue(CONFORMS_TO,
"http://bibnum.bnf.fr/WARC/WARC_ISO_28500_version1_latestdraft.pdf");
+
+ record.addLabelValue(SOFTWARE, conf.get("http.agent.name", ""));
+ record.addLabelValue(HTTP_HEADER_USER_AGENT,
+ getAgentString(conf.get("http.agent.name", ""),
+ conf.get("http.agent.version", ""),
+ conf.get("http.agent.description", ""),
+ conf.get("http.agent.url", ""),
+ conf.get("http.agent.email", "")));
+ record.addLabelValue(HTTP_HEADER_FROM,
+ conf.get("http.agent.email", ""));
+
+ try {
+ record.addLabelValue(HOSTNAME, getHostname(conf));
+ record.addLabelValue(IP, getIPAddress(conf));
+ } catch (UnknownHostException ignored) {
+ // do nothing as this fields are optional
+ }
+
+ record.addLabelValue(ROBOTS, "classic"); // TODO Make configurable?
+ record.addLabelValue(OPERATOR, conf.get("http.agent.email", ""));
+
+ return record;
+ }
+
+ public static final String getHostname(Configuration conf)
+ throws UnknownHostException {
+
+ return StringUtil.isEmpty(conf.get("http.agent.host", "")) ?
+ InetAddress.getLocalHost().getHostName() :
+ conf.get("http.agent.host");
+ }
+
+ public static final String getIPAddress(Configuration conf)
+ throws UnknownHostException {
+
+ return InetAddress.getLocalHost().getHostAddress();
+ }
+
+ public static final byte[] toByteArray(HttpHeaders headers)
+ throws IOException {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ headers.write(out);
+
+ return out.toByteArray();
+ }
+
+ public static final String getAgentString(String name, String version,
+ String description, String URL, String email) {
+
+ StringBuffer buf = new StringBuffer();
+
+ buf.append(name);
+
+ if (version != null) {
+ buf.append("/").append(version);
+ }
+
+ if (((description != null) && (description.length() != 0)) || (
+ (email != null) && (email.length() != 0)) || ((URL != null) &&
(
+ URL.length() != 0))) {
+ buf.append(" (");
+
+ if ((description != null) && (description.length() != 0)) {
+ buf.append(description);
+ if ((URL != null) || (email != null))
+ buf.append("; ");
+ }
+
+ if ((URL != null) && (URL.length() != 0)) {
+ buf.append(URL);
+ if (email != null)
+ buf.append("; ");
+ }
+
+ if ((email != null) && (email.length() != 0))
+ buf.append(email);
+
+ buf.append(")");
+ }
+
+ return buf.toString();
+ }
+
+ public static final WARCRecordInfo docToMetadata(NutchDocument doc)
+ throws UnsupportedEncodingException {
+ WARCRecordInfo record = new WARCRecordInfo();
+
+ record.setType(WARCConstants.WARCRecordType.metadata);
+ record.setUrl((String) doc.getFieldValue("id"));
+ record.setCreate14DigitDate(
+ DateUtils.get14DigitDate((Date) doc.getFieldValue("tstamp")));
+ record.setMimetype("application/warc-fields");
+ record.setRecordId(generator.getRecordID());
+
+ // metadata
+ ANVLRecord metadata = new ANVLRecord();
+
+ for (String field : doc.getFieldNames()) {
+ List<Object> values = doc.getField(field).getValues();
+ for (Object value : values) {
+ if (value instanceof Date) {
+ metadata.addLabelValue(field, DateUtils.get14DigitDate());
+ } else {
+ metadata.addLabelValue(field, (String) value);
+ }
+ }
+ }
+
+ record.setContentLength(metadata.getLength());
+ record.setContentStream(
+ new ByteArrayInputStream(metadata.getUTF8Bytes()));
+
+ return record;
+ }
+}
Modified:
nutch/trunk/src/plugin/protocol-http/src/java/org/apache/nutch/protocol/http/HttpResponse.java
URL:
http://svn.apache.org/viewvc/nutch/trunk/src/plugin/protocol-http/src/java/org/apache/nutch/protocol/http/HttpResponse.java?rev=1704594&r1=1704593&r2=1704594&view=diff
==============================================================================
---
nutch/trunk/src/plugin/protocol-http/src/java/org/apache/nutch/protocol/http/HttpResponse.java
(original)
+++
nutch/trunk/src/plugin/protocol-http/src/java/org/apache/nutch/protocol/http/HttpResponse.java
Tue Sep 22 12:23:31 2015
@@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -43,7 +43,9 @@ import org.apache.nutch.protocol.Protoco
import org.apache.nutch.protocol.http.api.HttpBase;
import org.apache.nutch.protocol.http.api.HttpException;
-/** An HTTP response. */
+/**
+ * An HTTP response.
+ */
public class HttpResponse implements Response {
private Configuration conf;
@@ -54,6 +56,8 @@ public class HttpResponse implements Res
private byte[] content;
private int code;
private Metadata headers = new SpellCheckedMetadata();
+ // used for storing the http headers verbatim
+ private StringBuffer httpHeaders;
protected enum Scheme {
HTTP, HTTPS,
@@ -61,7 +65,7 @@ public class HttpResponse implements Res
/**
* Default public constructor.
- *
+ *
* @param http
* @param url
* @param datum
@@ -125,24 +129,24 @@ public class HttpResponse implements Res
if (scheme == Scheme.HTTPS) {
SSLSocketFactory factory = (SSLSocketFactory) SSLSocketFactory
.getDefault();
- SSLSocket sslsocket = (SSLSocket) factory.createSocket(socket,
- sockHost, sockPort, true);
+ SSLSocket sslsocket = (SSLSocket) factory
+ .createSocket(socket, sockHost, sockPort, true);
sslsocket.setUseClientMode(true);
// Get the protocols and ciphers supported by this JVM
- Set<String> protocols = new HashSet<String>(Arrays.asList(sslsocket
- .getSupportedProtocols()));
- Set<String> ciphers = new HashSet<String>(Arrays.asList(sslsocket
- .getSupportedCipherSuites()));
+ Set<String> protocols = new HashSet<String>(
+ Arrays.asList(sslsocket.getSupportedProtocols()));
+ Set<String> ciphers = new HashSet<String>(
+ Arrays.asList(sslsocket.getSupportedCipherSuites()));
// Intersect with preferred protocols and ciphers
protocols.retainAll(http.getTlsPreferredProtocols());
ciphers.retainAll(http.getTlsPreferredCipherSuites());
- sslsocket.setEnabledProtocols(protocols.toArray(new String[protocols
- .size()]));
- sslsocket.setEnabledCipherSuites(ciphers.toArray(new String[ciphers
- .size()]));
+ sslsocket.setEnabledProtocols(
+ protocols.toArray(new String[protocols.size()]));
+ sslsocket.setEnabledCipherSuites(
+ ciphers.toArray(new String[ciphers.size()]));
sslsocket.startHandshake();
socket = sslsocket;
@@ -193,34 +197,54 @@ public class HttpResponse implements Res
reqStr.append("\r\n");
if (http.isIfModifiedSinceEnabled() && datum.getModifiedTime() > 0) {
- reqStr.append("If-Modified-Since: "
- + HttpDateFormat.toString(datum.getModifiedTime()));
+ reqStr.append("If-Modified-Since: " + HttpDateFormat
+ .toString(datum.getModifiedTime()));
reqStr.append("\r\n");
}
reqStr.append("\r\n");
+ // store the request in the metadata?
+ if (conf.getBoolean("store.http.request", false) == true) {
+ headers.add("_request_", reqStr.toString());
+ }
+
byte[] reqBytes = reqStr.toString().getBytes();
req.write(reqBytes);
req.flush();
PushbackInputStream in = // process response
- new PushbackInputStream(new BufferedInputStream(socket.getInputStream(),
- Http.BUFFER_SIZE), Http.BUFFER_SIZE);
+ new PushbackInputStream(
+ new BufferedInputStream(socket.getInputStream(),
+ Http.BUFFER_SIZE), Http.BUFFER_SIZE);
StringBuffer line = new StringBuffer();
+ // store the http headers verbatim
+ if (conf.getBoolean("store.http.headers", false) == true) {
+ httpHeaders = new StringBuffer();
+ }
+
+ headers.add("nutch.fetch.time",
Long.toString(System.currentTimeMillis()));
+
boolean haveSeenNonContinueStatus = false;
while (!haveSeenNonContinueStatus) {
// parse status code line
this.code = parseStatusLine(in, line);
+ if (httpHeaders != null)
+ httpHeaders.append(line).append("\n");
// parse headers
- parseHeaders(in, line);
+ parseHeaders(in, line, httpHeaders);
haveSeenNonContinueStatus = code != 100; // 100 is "Continue"
}
+
+ if (httpHeaders != null) {
+ headers.add("_response.headers_", httpHeaders.toString());
+ }
+
String transferEncoding = getHeader(Response.TRANSFER_ENCODING);
- if (transferEncoding != null
- && "chunked".equalsIgnoreCase(transferEncoding.trim())) {
+ if (transferEncoding != null && "chunked"
+ .equalsIgnoreCase(transferEncoding.trim())) {
readChunkedContent(in, line);
} else {
readPlainContent(in);
@@ -274,8 +298,8 @@ public class HttpResponse implements Res
* -------------------------
*/
- private void readPlainContent(InputStream in) throws HttpException,
- IOException {
+ private void readPlainContent(InputStream in)
+ throws HttpException, IOException {
int contentLength = Integer.MAX_VALUE; // get content length
String contentLengthString = headers.get(Response.CONTENT_LENGTH);
@@ -288,9 +312,10 @@ public class HttpResponse implements Res
throw new HttpException("bad content length: " + contentLengthString);
}
}
- if (http.getMaxContent() >= 0 && contentLength > http.getMaxContent()) //
limit
- //
download
- //
size
+ if (http.getMaxContent() >= 0 && contentLength > http
+ .getMaxContent()) // limit
+ // download
+ // size
contentLength = http.getMaxContent();
ByteArrayOutputStream out = new ByteArrayOutputStream(Http.BUFFER_SIZE);
@@ -323,7 +348,6 @@ public class HttpResponse implements Res
}
/**
- *
* @param in
* @param line
* @throws HttpException
@@ -368,16 +392,17 @@ public class HttpResponse implements Res
break;
}
- if (http.getMaxContent() >= 0
- && (contentBytesRead + chunkLen) > http.getMaxContent())
+ if (http.getMaxContent() >= 0 && (contentBytesRead + chunkLen) > http
+ .getMaxContent())
chunkLen = http.getMaxContent() - contentBytesRead;
// read one chunk
int chunkBytesRead = 0;
while (chunkBytesRead < chunkLen) {
- int toRead = (chunkLen - chunkBytesRead) < Http.BUFFER_SIZE ?
(chunkLen - chunkBytesRead)
- : Http.BUFFER_SIZE;
+ int toRead = (chunkLen - chunkBytesRead) < Http.BUFFER_SIZE ?
+ (chunkLen - chunkBytesRead) :
+ Http.BUFFER_SIZE;
int len = in.read(bytes, 0, toRead);
if (len == -1)
@@ -405,7 +430,7 @@ public class HttpResponse implements Res
}
content = out.toByteArray();
- parseHeaders(in, line);
+ parseHeaders(in, line, null);
}
@@ -425,15 +450,15 @@ public class HttpResponse implements Res
try {
code = Integer.parseInt(line.substring(codeStart + 1, codeEnd));
} catch (NumberFormatException e) {
- throw new HttpException("bad status line '" + line + "': "
- + e.getMessage(), e);
+ throw new HttpException(
+ "bad status line '" + line + "': " + e.getMessage(), e);
}
return code;
}
- private void processHeaderLine(StringBuffer line) throws IOException,
- HttpException {
+ private void processHeaderLine(StringBuffer line)
+ throws IOException, HttpException {
int colonIndex = line.indexOf(":"); // key is up to colon
if (colonIndex == -1) {
@@ -459,16 +484,19 @@ public class HttpResponse implements Res
}
// Adds headers to our headers Metadata
- private void parseHeaders(PushbackInputStream in, StringBuffer line)
- throws IOException, HttpException {
+ private void parseHeaders(PushbackInputStream in, StringBuffer line,
+ StringBuffer httpHeaders) throws IOException, HttpException {
while (readLine(in, line, true) != 0) {
+ if (httpHeaders != null)
+ httpHeaders.append(line).append("\n");
+
// handle HTTP responses with missing blank line after headers
int pos;
- if (((pos = line.indexOf("<!DOCTYPE")) != -1)
- || ((pos = line.indexOf("<HTML")) != -1)
- || ((pos = line.indexOf("<html")) != -1)) {
+ if (((pos = line.indexOf("<!DOCTYPE")) != -1) || (
+ (pos = line.indexOf("<HTML")) != -1) || ((pos =
line.indexOf("<html"))
+ != -1)) {
in.unread(line.substring(pos).getBytes("UTF-8"));
line.setLength(pos);
@@ -526,4 +554,4 @@ public class HttpResponse implements Res
return value;
}
-}
+}
\ No newline at end of file