Author: jorgelbg
Date: Tue Sep 22 12:23:31 2015
New Revision: 1704594

URL: http://svn.apache.org/viewvc?rev=1704594&view=rev
Log:
NUTCH-2095 WARC exporter for the CommonCrawlDataDumper


Added:
    nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormatWARC.java
    nutch/trunk/src/java/org/apache/nutch/tools/WARCUtils.java
Modified:
    nutch/trunk/conf/nutch-default.xml
    nutch/trunk/ivy/ivy.xml
    nutch/trunk/src/java/org/apache/nutch/tools/AbstractCommonCrawlFormat.java
    nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlConfig.java
    nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlDataDumper.java
    nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormat.java
    nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormatFactory.java
    nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormatJackson.java
    nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormatJettinson.java
    nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormatSimple.java
    
nutch/trunk/src/plugin/protocol-http/src/java/org/apache/nutch/protocol/http/HttpResponse.java

Modified: nutch/trunk/conf/nutch-default.xml
URL: 
http://svn.apache.org/viewvc/nutch/trunk/conf/nutch-default.xml?rev=1704594&r1=1704593&r2=1704594&view=diff
==============================================================================
--- nutch/trunk/conf/nutch-default.xml (original)
+++ nutch/trunk/conf/nutch-default.xml Tue Sep 22 12:23:31 2015
@@ -1878,4 +1878,22 @@ CAUTION: Set the parser.timeout to -1 or
   </description>
 </property>
 
+<property>
+  <name>store.http.request</name>
+  <value>false</value>
+  <description>
+    Store the raw request made by Nutch, required to use the 
CommonCrawlDataDumper
+    tool for the WARC format.
+  </description>
+</property>
+
+<property>
+  <name>store.http.headers</name>
+  <value>false</value>
+  <description>
+    Store the raw headers received by Nutch from the server, required to use 
the 
+    CommonCrawlDataDumper tool for the WARC format.
+  </description>
+</property>
+
 </configuration>

Modified: nutch/trunk/ivy/ivy.xml
URL: 
http://svn.apache.org/viewvc/nutch/trunk/ivy/ivy.xml?rev=1704594&r1=1704593&r2=1704594&view=diff
==============================================================================
--- nutch/trunk/ivy/ivy.xml (original)
+++ nutch/trunk/ivy/ivy.xml Tue Sep 22 12:23:31 2015
@@ -81,6 +81,11 @@
         <dependency org="com.fasterxml.jackson.dataformat" 
name="jackson-dataformat-cbor" rev="2.5.1" conf="*->default"/>
         <dependency org="com.fasterxml.jackson.jaxrs" 
name="jackson-jaxrs-json-provider" rev="2.5.1" conf="*->default"/>       
               
+               <!-- WARC artifacts needed  -->
+               <dependency org="org.netpreserve.commons" 
name="webarchive-commons" rev="1.1.5" conf="*->default">
+                       <exclude module="hadoop-core"/>
+               </dependency>
+
                <!--artifacts needed for testing -->
                <dependency org="junit" name="junit" rev="4.11" 
conf="test->default" />
                <!--dependency org="org.apache.hadoop" name="hadoop-test" 
rev="1.2.0" conf="test->default" /-->

Modified: 
nutch/trunk/src/java/org/apache/nutch/tools/AbstractCommonCrawlFormat.java
URL: 
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/tools/AbstractCommonCrawlFormat.java?rev=1704594&r1=1704593&r2=1704594&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/tools/AbstractCommonCrawlFormat.java 
(original)
+++ nutch/trunk/src/java/org/apache/nutch/tools/AbstractCommonCrawlFormat.java 
Tue Sep 22 12:23:31 2015
@@ -18,12 +18,19 @@
 package org.apache.nutch.tools;
 
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
 import java.net.InetAddress;
+import java.net.URLEncoder;
 import java.net.UnknownHostException;
 import java.text.ParseException;
 
+import org.apache.commons.httpclient.URIException;
+import org.apache.commons.httpclient.util.URIUtil;
+import org.apache.commons.lang.NotImplementedException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.nutch.metadata.Metadata;
+import org.apache.nutch.parse.ParseData;
+import org.apache.nutch.protocol.Content;
 import org.apache.nutch.util.URLUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,32 +42,32 @@ import com.ibm.icu.text.SimpleDateFormat
  *
  */
 public abstract class AbstractCommonCrawlFormat implements CommonCrawlFormat {
-       private static final Logger LOG = 
LoggerFactory.getLogger(AbstractCommonCrawlFormat.class.getName());
-       
+       protected static final Logger LOG = 
LoggerFactory.getLogger(AbstractCommonCrawlFormat.class.getName());
+
        protected String url;
-       
-       protected byte[] content;
-       
+
+       protected Content content;
+
        protected Metadata metadata;
-       
+
        protected Configuration conf;
-       
+
        protected String keyPrefix;
-       
+
        protected boolean simpleDateFormat;
-       
+
        protected boolean jsonArray;
-       
+
        protected boolean reverseKey;
-       
+
        protected String reverseKeyValue;
 
-       public AbstractCommonCrawlFormat(String url, byte[] content, Metadata 
metadata, Configuration nutchConf, CommonCrawlConfig config) throws IOException 
{
+       public AbstractCommonCrawlFormat(String url, Content content, Metadata 
metadata, Configuration nutchConf, CommonCrawlConfig config) throws IOException 
{
                this.url = url;
                this.content = content;
                this.metadata = metadata;
                this.conf = nutchConf;
-               
+
                this.keyPrefix = config.getKeyPrefix();
                this.simpleDateFormat = config.getSimpleDateFormat();
                this.jsonArray = config.getJsonArray();
@@ -68,17 +75,33 @@ public abstract class AbstractCommonCraw
                this.reverseKeyValue = config.getReverseKeyValue();
        }
 
+       public String getJsonData(String url, Content content, Metadata 
metadata)
+      throws IOException {
+               this.url = url;
+               this.content = content;
+               this.metadata = metadata;
+
+               return this.getJsonData();
+       }
+
+       public String getJsonData(String url, Content content, Metadata 
metadata,
+                       ParseData parseData) throws IOException {
+
+    // override of this is required in the actual formats
+               throw new NotImplementedException();
+       }
+
        @Override
        public String getJsonData() throws IOException {
                try {
                        startObject(null);
-                       
+
                        // url
                        writeKeyValue("url", getUrl());
-                       
+
                        // timestamp
                        writeKeyValue("timestamp", getTimestamp());
-                       
+
                        // request
                        startObject("request");
                        writeKeyValue("method", getMethod());
@@ -102,7 +125,7 @@ public abstract class AbstractCommonCraw
                        closeHeaders("headers", false, true);
                        writeKeyNull("body");
                        closeObject("request");
-                       
+
                        // response
                        startObject("response");
                        writeKeyValue("status", getResponseStatus());
@@ -125,50 +148,56 @@ public abstract class AbstractCommonCraw
                        closeHeaders("headers", false, true);
                        writeKeyValue("body", getResponseContent());
                        closeObject("response");
-                       
+
                        // key
                        if (!this.keyPrefix.isEmpty()) {
                                this.keyPrefix += "-";
                        }
                        writeKeyValue("key", this.keyPrefix + getKey());
-                       
+
                        // imported
                        writeKeyValue("imported", getImported());
-                       
+
                        closeObject(null);
-                       
+
                        return generateJson();
-               
+
                } catch (IOException ioe) {
                        LOG.warn("Error in processing file " + url + ": " + 
ioe.getMessage());
-                       throw new IOException("Error in generating JSON:" + 
ioe.getMessage()); 
+                       throw new IOException("Error in generating JSON:" + 
ioe.getMessage());
                }
        }
-       
+
        // abstract methods
-       
+
        protected abstract void writeKeyValue(String key, String value) throws 
IOException;
-       
+
        protected abstract void writeKeyNull(String key) throws IOException;
-       
+
        protected abstract void startArray(String key, boolean nested, boolean 
newline) throws IOException;
-       
+
        protected abstract void closeArray(String key, boolean nested, boolean 
newline) throws IOException;
-       
+
        protected abstract void writeArrayValue(String value) throws 
IOException;
-       
+
        protected abstract void startObject(String key) throws IOException;
-       
+
        protected abstract void closeObject(String key) throws IOException;
-       
+
        protected abstract String generateJson() throws IOException;
-       
+
        // getters
-       
+
        protected String getUrl() {
+               try {
+                       return URIUtil.encodePath(url);
+               } catch (URIException e) {
+                       LOG.error("Can't encode URL " + url);
+               }
+
                return url;
        }
-       
+
        protected String getTimestamp() {
                if (this.simpleDateFormat) {
                        String timestamp = null;
@@ -183,88 +212,88 @@ public abstract class AbstractCommonCraw
                        return 
ifNullString(metadata.get(Metadata.LAST_MODIFIED));
                }
        }
-       
+
        protected String getMethod() {
                return new String("GET");
        }
-       
+
        protected String getRequestHostName() {
                String hostName = "";
                try {
                        hostName = InetAddress.getLocalHost().getHostName();
                } catch (UnknownHostException uhe) {
-                       
+
                }
                return hostName;
        }
-       
+
        protected String getRequestHostAddress() {
                String hostAddress = "";
                try {
                        hostAddress = 
InetAddress.getLocalHost().getHostAddress();
                } catch (UnknownHostException uhe) {
-                       
+
                }
                return hostAddress;
        }
-       
+
        protected String getRequestSoftware() {
                return conf.get("http.agent.version", "");
        }
-       
+
        protected String getRequestRobots() {
                return new String("CLASSIC");
        }
-       
+
        protected String getRequestContactName() {
                return conf.get("http.agent.name", "");
        }
-       
+
        protected String getRequestContactEmail() {
                return conf.get("http.agent.email", "");
        }
-       
+
        protected String getRequestAccept() {
                return conf.get("http.accept", "");
        }
-       
+
        protected String getRequestAcceptEncoding() {
                return new String(""); // TODO
        }
-       
+
        protected String getRequestAcceptLanguage() {
                return conf.get("http.accept.language", "");
        }
-       
+
        protected String getRequestUserAgent() {
                return conf.get("http.robots.agents", "");
        }
-       
+
        protected String getResponseStatus() {
                return ifNullString(metadata.get("status"));
        }
-       
+
        protected String getResponseHostName() {
                return URLUtil.getHost(url);
        }
-       
+
        protected String getResponseAddress() {
                return ifNullString(metadata.get("_ip_"));
        }
-       
+
        protected String getResponseContentEncoding() {
                return ifNullString(metadata.get("Content-Encoding"));
        }
-       
+
        protected String getResponseContentType() {
                return ifNullString(metadata.get("Content-Type"));
        }
-       
+
        protected String getResponseDate() {
                if (this.simpleDateFormat) {
                        String timestamp = null;
                        try {
-                               long epoch = new SimpleDateFormat("EEE, d MMM 
yyyy HH:mm:ss z").parse(ifNullString(metadata.get("Date"))).getTime();
+                               long epoch = new SimpleDateFormat("EEE, dd MMM 
yyyy HH:mm:ss z").parse(ifNullString(metadata.get("Date"))).getTime();
                                timestamp = String.valueOf(epoch);
                        } catch (ParseException pe) {
                                LOG.warn(pe.getMessage());
@@ -274,15 +303,15 @@ public abstract class AbstractCommonCraw
                        return ifNullString(metadata.get("Date"));
                }
        }
-       
+
        protected String getResponseServer() {
                return ifNullString(metadata.get("Server"));
        }
-       
+
        protected String getResponseContent() {
-               return new String(content);
+               return new String(content.getContent());
        }
-       
+
        protected String getKey() {
                if (this.reverseKey) {
                        return this.reverseKeyValue;
@@ -291,7 +320,7 @@ public abstract class AbstractCommonCraw
                        return url;
                }
        }
-       
+
        protected String getImported() {
                if (this.simpleDateFormat) {
                        String timestamp = null;
@@ -306,11 +335,11 @@ public abstract class AbstractCommonCraw
                        return ifNullString(metadata.get("Date"));
                }
        }
-       
+
        private static String ifNullString(String value) {
                return (value != null) ? value : "";
        }
-       
+
        private void startHeaders(String key, boolean nested, boolean newline) 
throws IOException {
                if (this.jsonArray) {
                        startArray(key, nested, newline);
@@ -319,7 +348,7 @@ public abstract class AbstractCommonCraw
                        startObject(key);
                }
        }
-       
+
        private void closeHeaders(String key, boolean nested, boolean newline) 
throws IOException {
                if (this.jsonArray) {
                        closeArray(key, nested, newline);
@@ -328,7 +357,7 @@ public abstract class AbstractCommonCraw
                        closeObject(key);
                }
        }
-       
+
        private void writeKeyValueWrapper(String key, String value) throws 
IOException {
                if (this.jsonArray) {
                        startArray(null, true, false);
@@ -340,4 +369,7 @@ public abstract class AbstractCommonCraw
                        writeKeyValue(key, value);
                }
        }
+
+       @Override
+       public void close() {}
 }

Modified: nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlConfig.java
URL: 
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlConfig.java?rev=1704594&r1=1704593&r2=1704594&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlConfig.java 
(original)
+++ nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlConfig.java Tue Sep 
22 12:23:31 2015
@@ -39,6 +39,12 @@ public class CommonCrawlConfig implement
        private boolean reverseKey = false;
        
        private String reverseKeyValue = "";
+
+       private boolean compressed = false;
+
+       private long warcSize = 0;
+
+       private String outputDir;
        
        /**
         * Default constructor
@@ -114,4 +120,28 @@ public class CommonCrawlConfig implement
        public String getReverseKeyValue() {
                return this.reverseKeyValue;
        }
+
+       public boolean isCompressed() {
+               return compressed;
+       }
+
+       public void setCompressed(boolean compressed) {
+               this.compressed = compressed;
+       }
+
+       public long getWarcSize() {
+               return warcSize;
+       }
+
+       public void setWarcSize(long warcSize) {
+               this.warcSize = warcSize;
+       }
+
+       public String getOutputDir() {
+               return outputDir;
+       }
+
+       public void setOutputDir(String outputDir) {
+               this.outputDir = outputDir;
+       }
 }

Modified: nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlDataDumper.java
URL: 
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlDataDumper.java?rev=1704594&r1=1704593&r2=1704594&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlDataDumper.java 
(original)
+++ nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlDataDumper.java Tue 
Sep 22 12:23:31 2015
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,6 +18,7 @@
 package org.apache.nutch.tools;
 
 //JDK imports
+
 import java.io.BufferedOutputStream;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -51,13 +52,17 @@ import org.apache.commons.io.FilenameUti
 
 //Hadoop
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
 import org.apache.nutch.metadata.Metadata;
 import org.apache.nutch.protocol.Content;
+import org.apache.nutch.segment.SegmentReader;
 import org.apache.nutch.util.DumpFileUtil;
 import org.apache.nutch.util.NutchConfiguration;
 //Tika imports
@@ -90,11 +95,11 @@ import com.ibm.icu.text.SimpleDateFormat
  * <p>
  * Thus, the timestamped url key for the record is provided below followed by 
an
  * example record:
- * 
+ * <p/>
  * <pre>
  * {@code
  * com_somepage_33a3e36bbef59c2a5242c2ccee59239ab30d51f3_1411623696000
- *     
+ *
  *     {
  *         "url": "http:\/\/somepage.com\/22\/14560817",
  *         "timestamp": "1411623696000",
@@ -133,23 +138,23 @@ import com.ibm.icu.text.SimpleDateFormat
  *                 "Server": "nginx",
  *                 "...": "..."
  *             },
- *             "body": "\r\n  <!DOCTYPE html PUBLIC ... \r\n\r\n  \r\n    
</body>\r\n    </html>\r\n  \r\n\r\n",    
+ *             "body": "\r\n  <!DOCTYPE html PUBLIC ... \r\n\r\n  \r\n    
</body>\r\n    </html>\r\n  \r\n\r\n",
  *         },
  *         "key": 
"com_somepage_33a3e36bbef59c2a5242c2ccee59239ab30d51f3_1411623696000",
  *         "imported": "1411623698000"
  *     }
  *     }
  * </pre>
- * 
+ * <p/>
  * <p>
  * Upon successful completion the tool displays a very convenient JSON snippet
  * detailing the mimetype classifications and the counts of documents which 
fall
  * into those classifications. An example is as follows:
  * </p>
- * 
+ * <p/>
  * <pre>
  * {@code
- * INFO: File Types: 
+ * INFO: File Types:
  *   TOTAL Stats:    {
  *     {"mimeType":"application/xml","count":19"}
  *     {"mimeType":"image/png","count":47"}
@@ -164,465 +169,522 @@ import com.ibm.icu.text.SimpleDateFormat
  *   }
  * }
  * </pre>
- * 
  */
-public class CommonCrawlDataDumper {
+public class CommonCrawlDataDumper extends Configured implements Tool {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(CommonCrawlDataDumper.class.getName());
+
+  private CommonCrawlConfig config = null;
+
+  // Gzip initialization
+  private FileOutputStream fileOutput = null;
+  private BufferedOutputStream bufOutput = null;
+  private GzipCompressorOutputStream gzipOutput = null;
+  private TarArchiveOutputStream tarOutput = null;
+  private ArrayList<String> fileList = null;
+
+  /**
+   * Main method for invoking this tool
+   *
+   * @param args 1) output directory (which will be created if it does not
+   *             already exist) to host the CBOR data and 2) a directory
+   *             containing one or more segments from which we wish to generate
+   *             CBOR data from. Optionally, 3) a list of mimetypes and the 4)
+   *             the gzip option may be provided.
+   * @throws Exception
+   */
+  @SuppressWarnings("static-access")
+  public static void main(String[] args) throws Exception {
+    Configuration conf = NutchConfiguration.create();
+    int res = ToolRunner.run(conf, new CommonCrawlDataDumper(), args);
+    System.exit(res);
+  }
+
+  /**
+   * Constructor
+   */
+  public CommonCrawlDataDumper(CommonCrawlConfig config) {
+    this.config = config;
+  }
+
+  public CommonCrawlDataDumper() {
+  }
+
+  /**
+   * Dumps the reverse engineered CBOR content from the provided segment
+   * directories if a parent directory contains more than one segment,
+   * otherwise a single segment can be passed as an argument. If the boolean
+   * argument is provided then the CBOR is also zipped.
+   *
+   * @param outputDir      the directory you wish to dump the raw content to. 
This
+   *                       directory will be created.
+   * @param segmentRootDir a directory containing one or more segments.
+   * @param gzip           a boolean flag indicating whether the CBOR content 
should also
+   *                       be gzipped.
+   * @param epochFilename  if {@code true}, output files will be names using 
the epoch time (in milliseconds).
+   * @param extension      a file extension to use with output documents.
+   * @throws Exception if any exception occurs.
+   */
+  public void dump(File outputDir, File segmentRootDir, boolean gzip,
+      String[] mimeTypes, boolean epochFilename, String extension, boolean 
warc)
+      throws Exception {
+    if (gzip) {
+      LOG.info("Gzipping CBOR data has been skipped");
+    }
+    // total file counts
+    Map<String, Integer> typeCounts = new HashMap<String, Integer>();
+    // filtered file counters
+    Map<String, Integer> filteredCounts = new HashMap<String, Integer>();
+
+    Configuration nutchConfig = NutchConfiguration.create();
+    FileSystem fs = FileSystem.get(nutchConfig);
+    File[] segmentDirs = segmentRootDir.listFiles(new FileFilter() {
+      @Override
+      public boolean accept(File file) {
+        return file.canRead() && file.isDirectory();
+      }
+    });
+
+    if (new File(
+        segmentRootDir.getAbsolutePath() + File.separator + Content.DIR_NAME
+            + "/part-00000/data").exists()) {
+      segmentDirs = new File[] { segmentRootDir };
+    }
+
+    if (segmentDirs == null) {
+      LOG.error(
+          "No segment directories found in [" + 
segmentRootDir.getAbsolutePath()
+              + "]");
+      System.exit(1);
+    }
+
+    if (gzip && !warc) {
+      fileList = new ArrayList<String>();
+      constructNewStream(outputDir);
+    }
+
+    CommonCrawlFormat format = CommonCrawlFormatFactory
+        .getCommonCrawlFormat("JACKSON", nutchConfig, config);
+
+    if (warc) {
+      format = CommonCrawlFormatFactory
+          .getCommonCrawlFormat("WARC", nutchConfig, config);
+    }
+
+    for (File segment : segmentDirs) {
+      LOG.info("Processing segment: [" + segment.getAbsolutePath() + "]");
+      try {
+        String segmentContentPath =
+            segment.getAbsolutePath() + File.separator + Content.DIR_NAME
+                + "/part-00000/data";
+        Path file = new Path(segmentContentPath);
+
+        if (!new File(file.toString()).exists()) {
+          LOG.warn("Skipping segment: [" + segmentContentPath
+              + "]: no data directory present");
+          continue;
+        }
+        SequenceFile.Reader reader = new SequenceFile.Reader(nutchConfig,
+            SequenceFile.Reader.file(file));
+
+        if (!new File(file.toString()).exists()) {
+          LOG.warn("Skipping segment: [" + segmentContentPath
+              + "]: no data directory present");
+          continue;
+        }
+
+        Writable key = (Writable) reader.getKeyClass().newInstance();
+
+        Content content = null;
+
+        while (reader.next(key)) {
+          content = new Content();
+          reader.getCurrentValue(content);
+          Metadata metadata = content.getMetadata();
+          String url = key.toString();
+
+          String baseName = FilenameUtils.getBaseName(url);
+          String extensionName = FilenameUtils.getExtension(url);
+
+          if (!extension.isEmpty()) {
+            extensionName = extension;
+          } else if ((extensionName == null) || extensionName.isEmpty()) {
+            extensionName = "html";
+          }
+
+          String outputFullPath = null;
+          String outputRelativePath = null;
+          String filename = null;
+          String timestamp = null;
+          String reverseKey = null;
+
+          if (epochFilename || config.getReverseKey()) {
+            try {
+              long epoch = new SimpleDateFormat("EEE, d MMM yyyy HH:mm:ss z")
+                  .parse(getDate(metadata.get("Date"))).getTime();
+              timestamp = String.valueOf(epoch);
+            } catch (ParseException pe) {
+              LOG.warn(pe.getMessage());
+            }
+
+            reverseKey = reverseUrl(url);
+            config.setReverseKeyValue(
+                reverseKey.replace("/", "_") + "_" + DigestUtils.shaHex(url)
+                    + "_" + timestamp);
+          }
+
+          if (!warc) {
+            if (epochFilename) {
+              outputFullPath = DumpFileUtil
+                  .createFileNameFromUrl(outputDir.getAbsolutePath(),
+                      reverseKey, url, timestamp, extensionName, !gzip);
+              outputRelativePath = outputFullPath
+                  .substring(0, outputFullPath.lastIndexOf(File.separator) - 
1);
+              filename = content.getMetadata().get(Metadata.DATE) + "."
+                  + extensionName;
+            } else {
+              String md5Ofurl = DumpFileUtil.getUrlMD5(url);
+              String fullDir = DumpFileUtil
+                  .createTwoLevelsDirectory(outputDir.getAbsolutePath(),
+                      md5Ofurl, !gzip);
+              filename = DumpFileUtil
+                  .createFileName(md5Ofurl, baseName, extensionName);
+              outputFullPath = String.format("%s/%s", fullDir, filename);
+
+              String[] fullPathLevels = fullDir.split(File.separator);
+              String firstLevelDirName = fullPathLevels[fullPathLevels.length
+                  - 2];
+              String secondLevelDirName = fullPathLevels[fullPathLevels.length
+                  - 1];
+              outputRelativePath = firstLevelDirName + secondLevelDirName;
+            }
+          }
+          // Encode all filetypes if no mimetypes have been given
+          Boolean filter = (mimeTypes == null);
+
+          String jsonData = "";
+          try {
+            String mimeType = new Tika().detect(content.getContent());
+            // Maps file to JSON-based structure
+
+            jsonData = format.getJsonData(url, content, metadata);
+
+            collectStats(typeCounts, mimeType);
+            // collects statistics for the given mimetypes
+            if ((mimeType != null) && (mimeTypes != null) && Arrays
+                .asList(mimeTypes).contains(mimeType)) {
+              collectStats(filteredCounts, mimeType);
+              filter = true;
+            }
+          } catch (IOException ioe) {
+            LOG.error("Fatal error in creating JSON data: " + 
ioe.getMessage());
+            return;
+          }
+
+          if (!warc) {
+            if (filter) {
+              byte[] byteData = serializeCBORData(jsonData);
+
+              if (!gzip) {
+                File outputFile = new File(outputFullPath);
+                if (outputFile.exists()) {
+                  LOG.info("Skipping writing: [" + outputFullPath
+                      + "]: file already exists");
+                } else {
+                  LOG.info("Writing: [" + outputFullPath + "]");
+                  IOUtils.copy(new ByteArrayInputStream(byteData),
+                      new FileOutputStream(outputFile));
+                }
+              } else {
+                if (fileList.contains(outputFullPath)) {
+                  LOG.info("Skipping compressing: [" + outputFullPath
+                      + "]: file already exists");
+                } else {
+                  fileList.add(outputFullPath);
+                  LOG.info("Compressing: [" + outputFullPath + "]");
+                  //TarArchiveEntry tarEntry = new 
TarArchiveEntry(firstLevelDirName + File.separator + secondLevelDirName + 
File.separator + filename);
+                  TarArchiveEntry tarEntry = new TarArchiveEntry(
+                      outputRelativePath + File.separator + filename);
+                  tarEntry.setSize(byteData.length);
+                  tarOutput.putArchiveEntry(tarEntry);
+                  tarOutput.write(byteData);
+                  tarOutput.closeArchiveEntry();
+                }
+              }
+            }
+          }
+        }
+
+        reader.close();
+      } finally {
+        fs.close();
+      }
+    }
+
+    // close the format if needed
+    format.close();
+
+    if (gzip && !warc) {
+      closeStream();
+    }
+
+    if (!typeCounts.isEmpty()) {
+      LOG.info("CommonsCrawlDataDumper File Stats: " + DumpFileUtil
+          .displayFileTypes(typeCounts, filteredCounts));
+    }
+
+  }
+
+  private void closeStream() {
+    try {
+      tarOutput.finish();
+
+      tarOutput.close();
+      gzipOutput.close();
+      bufOutput.close();
+      fileOutput.close();
+    } catch (IOException ioe) {
+      LOG.warn("Error in closing stream: " + ioe.getMessage());
+    }
+  }
+
+  private void constructNewStream(File outputDir) throws IOException {
+    String archiveName = new SimpleDateFormat("yyyyMMddhhmm'.tar.gz'")
+        .format(new Date());
+    LOG.info("Creating a new gzip archive: " + archiveName);
+    fileOutput = new FileOutputStream(
+        new File(outputDir + File.separator + archiveName));
+    bufOutput = new BufferedOutputStream(fileOutput);
+    gzipOutput = new GzipCompressorOutputStream(bufOutput);
+    tarOutput = new TarArchiveOutputStream(gzipOutput);
+    tarOutput.setLongFileMode(TarArchiveOutputStream.LONGFILE_GNU);
+  }
+
+  /**
+   * Writes the CBOR "Self-Describe Tag" (value 55799, serialized as 3-byte
+   * sequence of {@code 0xd9d9f7}) at the current position. This method must
+   * be used to write the CBOR magic number at the beginning of the document.
+   * Since version 2.5, <a
+   * href="https://github.com/FasterXML/jackson-dataformat-cbor";
+   * >jackson-dataformat-cbor</a> will support the {@code WRITE_TYPE_HEADER}
+   * feature to write that type tag at the beginning of the document.
+   *
+   * @param generator {@link CBORGenerator} object used to create a 
CBOR-encoded document.
+   * @throws IOException if any I/O error occurs.
+   * @see <a href="https://tools.ietf.org/html/rfc7049#section-2.4.5";>RFC
+   * 7049</a>
+   */
+  private void writeMagicHeader(CBORGenerator generator) throws IOException {
+    // Writes self-describe CBOR
+    // https://tools.ietf.org/html/rfc7049#section-2.4.5
+    // It will be supported in jackson-cbor since 2.5
+    byte[] header = new byte[3];
+    header[0] = (byte) 0xd9;
+    header[1] = (byte) 0xd9;
+    header[2] = (byte) 0xf7;
+    generator.writeBytes(header, 0, header.length);
+  }
+
+  private byte[] serializeCBORData(String jsonData) {
+    CBORFactory factory = new CBORFactory();
+
+    CBORGenerator generator = null;
+    ByteArrayOutputStream stream = null;
+
+    try {
+      stream = new ByteArrayOutputStream();
+      generator = factory.createGenerator(stream);
+      // Writes CBOR tag
+      writeMagicHeader(generator);
+      generator.writeString(jsonData);
+      generator.flush();
+      stream.flush();
+
+      return stream.toByteArray();
+
+    } catch (Exception e) {
+      LOG.warn("CBOR encoding failed: " + e.getMessage());
+    } finally {
+      try {
+        generator.close();
+        stream.close();
+      } catch (IOException e) {
+        // nothing to do
+      }
+    }
+
+    return null;
+  }
+
+  private void collectStats(Map<String, Integer> typeCounts, String mimeType) {
+    typeCounts.put(mimeType,
+        typeCounts.containsKey(mimeType) ? typeCounts.get(mimeType) + 1 : 1);
+  }
+
+  /**
+   * Gets the current date if the given timestamp is empty or null.
+   *
+   * @param timestamp the timestamp
+   * @return the current timestamp if the given one is null.
+   */
+  private String getDate(String timestamp) {
+    if (timestamp == null || timestamp.isEmpty()) {
+      DateFormat dateFormat = new SimpleDateFormat(
+          "EEE, d MMM yyyy HH:mm:ss z");
+      timestamp = dateFormat.format(new Date());
+    }
+    return timestamp;
+
+  }
+
+  public static String reverseUrl(String urlString) {
+    URL url;
+    String reverseKey = null;
+    try {
+      url = new URL(urlString);
+
+      String[] hostPart = url.getHost().replace('.', '/').split("/");
+
+      StringBuilder sb = new StringBuilder();
+      sb.append(hostPart[hostPart.length - 1]);
+      for (int i = hostPart.length - 2; i >= 0; i--) {
+        sb.append("/" + hostPart[i]);
+      }
 
-       private static final Logger LOG = 
LoggerFactory.getLogger(CommonCrawlDataDumper.class.getName());
-       
-       private CommonCrawlConfig config = null;
-       
-       // Gzip initialization
-       private FileOutputStream fileOutput = null;
-       private BufferedOutputStream bufOutput = null;
-       private GzipCompressorOutputStream gzipOutput = null;
-       private TarArchiveOutputStream tarOutput = null;
-       private ArrayList<String> fileList = null;
-
-       /**
-        * Main method for invoking this tool
-        * 
-        * @param args
-        *            1) output directory (which will be created if it does not
-        *            already exist) to host the CBOR data and 2) a directory
-        *            containing one or more segments from which we wish to 
generate
-        *            CBOR data from. Optionally, 3) a list of mimetypes and 
the 4) 
-        *            the gzip option may be provided.
-        * @throws Exception
-        */
-       @SuppressWarnings("static-access")
-       public static void main(String[] args) throws Exception {
-               Option helpOpt = new Option("h", "help", false,
-                               "show this help message.");
-               // argument options
-               Option outputOpt = OptionBuilder
-                               .withArgName("outputDir")
-                               .hasArg()
-                               .withDescription(
-                                               "output directory (which will 
be created) to host the CBOR data.")
-                               .create("outputDir");
-               Option segOpt = OptionBuilder
-                               .withArgName("segment")
-                               .hasArgs()
-                               .withDescription("the segment(s) to use")
-                               .create("segment");
-               // create mimetype and gzip options
-               Option mimeOpt = OptionBuilder
-                               .isRequired(false)
-                               .withArgName("mimetype")
-                               .hasArgs()
-                               .withDescription(
-                                               "an optional list of mimetypes 
to dump, excluding all others. Defaults to all.")
-                               .create("mimetype");
-               Option gzipOpt = OptionBuilder
-                               .withArgName("gzip")
-                               .hasArg(false)
-                               .withDescription(
-                                               "an optional flag indicating 
whether to additionally gzip the data.")
-                               .create("gzip");
-               Option keyPrefixOpt = OptionBuilder
-                               .withArgName("keyPrefix")
-                               .hasArg(true)
-                               .withDescription("an optional prefix for key in 
the output format.")
-                               .create("keyPrefix");
-               Option simpleDateFormatOpt = OptionBuilder
-                               .withArgName("SimpleDateFormat")
-                               .hasArg(false)
-                               .withDescription("an optional format for 
timestamp in GMT epoch milliseconds.")
-                               .create("SimpleDateFormat");
-               Option epochFilenameOpt = OptionBuilder
-                               .withArgName("epochFilename")
-                               .hasArg(false)
-                               .withDescription("an optional format for output 
filename.")
-                               .create("epochFilename");
-               Option jsonArrayOpt = OptionBuilder
-                               .withArgName("jsonArray")
-                               .hasArg(false)
-                               .withDescription("an optional format for JSON 
output.")
-                               .create("jsonArray");
-               Option reverseKeyOpt = OptionBuilder
-                               .withArgName("reverseKey")
-                               .hasArg(false)
-                               .withDescription("an optional format for key 
value in JSON output.")
-                               .create("reverseKey");
-               Option extensionOpt = OptionBuilder
-                               .withArgName("extension")
-                               .hasArg(true)
-                               .withDescription("an optional file extension 
for output documents.")
-                               .create("extension");
-
-               // create the options
-               Options options = new Options();
-               options.addOption(helpOpt);
-               options.addOption(outputOpt);
-               options.addOption(segOpt);
-               // create mimetypes and gzip options
-               options.addOption(mimeOpt);
-               options.addOption(gzipOpt);
-               // create keyPrefix option
-               options.addOption(keyPrefixOpt);
-               // create simpleDataFormat option
-               options.addOption(simpleDateFormatOpt);
-               options.addOption(epochFilenameOpt);
-               options.addOption(jsonArrayOpt);
-               options.addOption(reverseKeyOpt);
-               options.addOption(extensionOpt);
-
-               CommandLineParser parser = new GnuParser();
-               try {
-                       CommandLine line = parser.parse(options, args);
-                       if (line.hasOption("help") || 
!line.hasOption("outputDir") || (!line.hasOption("segment"))) {
-                               HelpFormatter formatter = new HelpFormatter();
-                               
formatter.printHelp(CommonCrawlDataDumper.class.getName(), options, true);
-                               return;
-                       }
-
-                       File outputDir = new 
File(line.getOptionValue("outputDir"));
-                       File segmentRootDir = new 
File(line.getOptionValue("segment"));
-                       String[] mimeTypes = line.getOptionValues("mimetype");
-                       boolean gzip = line.hasOption("gzip");
-                       boolean epochFilename = line.hasOption("epochFilename");
-                       
-                       String keyPrefix = line.getOptionValue("keyPrefix", "");
-                       boolean simpleDateFormat = 
line.hasOption("SimpleDateFormat");
-                       boolean jsonArray = line.hasOption("jsonArray");
-                       boolean reverseKey = line.hasOption("reverseKey");
-                       String extension = line.getOptionValue("extension", "");
-                       
-                       CommonCrawlConfig config = new CommonCrawlConfig();
-                       config.setKeyPrefix(keyPrefix);
-                       config.setSimpleDateFormat(simpleDateFormat);
-                       config.setJsonArray(jsonArray);
-                       config.setReverseKey(reverseKey);
-
-                       if (!outputDir.exists()) {
-                               LOG.warn("Output directory: [" + 
outputDir.getAbsolutePath() + "]: does not exist, creating it.");
-                               if (!outputDir.mkdirs())
-                                       throw new Exception("Unable to create: 
[" + outputDir.getAbsolutePath() + "]");
-                       }
-
-                       CommonCrawlDataDumper dumper = new 
CommonCrawlDataDumper(config);
-                       
-                       dumper.dump(outputDir, segmentRootDir, gzip, mimeTypes, 
epochFilename, extension);
-                       
-               } catch (Exception e) {
-                       LOG.error(CommonCrawlDataDumper.class.getName() + ": " 
+ StringUtils.stringifyException(e));
-                       e.printStackTrace();
-                       return;
-               }
-       }
-       
-       /**
-        * Constructor
-        */
-       public CommonCrawlDataDumper(CommonCrawlConfig config) {
-               this.config = config;
-       }
-       
-       /**
-        * Dumps the reverse engineered CBOR content from the provided segment
-        * directories if a parent directory contains more than one segment,
-        * otherwise a single segment can be passed as an argument. If the 
boolean
-        * argument is provided then the CBOR is also zipped.
-        * 
-        * @param outputDir
-        *            the directory you wish to dump the raw content to. This
-        *            directory will be created.
-        * @param segmentRootDir
-        *            a directory containing one or more segments.
-        * @param gzip
-        *            a boolean flag indicating whether the CBOR content should 
also
-        *            be gzipped.
-        * @param mimetypes
-        *            an array of mime types we have to dump, all others will be
-     *            filtered out.
-     * @param epochFilename
-     *            if {@code true}, output files will be names using the epoch 
time (in milliseconds).
-     * @param extension
-     *            a file extension to use with output documents.
-        * @throws Exception if any exception occurs.
-        */
-       public void dump(File outputDir, File segmentRootDir, boolean gzip,     
String[] mimeTypes, boolean epochFilename, String extension) throws Exception {
-               if (gzip) {
-                       LOG.info("Gzipping CBOR data has been skipped");
-               }
-               // total file counts
-               Map<String, Integer> typeCounts = new HashMap<String, 
Integer>();
-               // filtered file counters
-               Map<String, Integer> filteredCounts = new HashMap<String, 
Integer>();
-               
-               Configuration nutchConfig = NutchConfiguration.create();
-               FileSystem fs = FileSystem.get(nutchConfig);
-               File[] segmentDirs = segmentRootDir.listFiles(new FileFilter() {
-                       @Override
-                       public boolean accept(File file) {
-                               return file.canRead() && file.isDirectory();
-                       }
-               });
-               
-               if (segmentDirs == null) {
-                       LOG.error("No segment directories found in [" + 
segmentRootDir.getAbsolutePath() + "]");
-                       System.exit(1);
-               }
-               
-               if (gzip) {
-                       fileList = new ArrayList<String>();
-                   constructNewStream(outputDir);
-               }
-
-               for (File segment : segmentDirs) {
-                       LOG.info("Processing segment: [" + 
segment.getAbsolutePath() + "]");
-                       try {
-                               String segmentContentPath = 
segment.getAbsolutePath() + File.separator + Content.DIR_NAME + 
"/part-00000/data";
-                               Path file = new Path(segmentContentPath);
-
-                               if (!new File(file.toString()).exists()) {
-                                       LOG.warn("Skipping segment: [" + 
segmentContentPath     + "]: no data directory present");
-                                       continue;
-                               }
-                               SequenceFile.Reader reader = new 
SequenceFile.Reader(nutchConfig, SequenceFile.Reader.file(file));
-
-                               if (!new File(file.toString()).exists()) {
-                                       LOG.warn("Skipping segment: [" + 
segmentContentPath     + "]: no data directory present");
-                                       continue;
-                               }
-                               Writable key = (Writable) 
reader.getKeyClass().newInstance();
-                               
-                               Content content = null;
-
-                               while (reader.next(key)) {
-                                       content = new Content();
-                                       reader.getCurrentValue(content);
-                                       Metadata metadata = 
content.getMetadata();
-                                       String url = key.toString();
-                                       String baseName = 
FilenameUtils.getBaseName(url);
-                                       String extensionName = 
FilenameUtils.getExtension(url);
-                                       
-                                       if (!extension.isEmpty()) {
-                                               extensionName = extension;
-                                       }
-                                       else if ((extensionName == null) || 
extensionName.isEmpty()) {
-                                               extensionName = "html";
-                                       }
-                                       
-                                       String outputFullPath = null;
-                                       String outputRelativePath = null;
-                                       String filename = null;
-                                       String timestamp = null;
-                                       String reverseKey = null;
-                                       
-                                       if (epochFilename || 
config.getReverseKey()) {  
-                                               try {
-                                                       long epoch = new 
SimpleDateFormat("EEE, d MMM yyyy HH:mm:ss 
z").parse(getDate(metadata.get("Date"))).getTime();
-                                                       timestamp = 
String.valueOf(epoch);
-                                               } catch (ParseException pe) {
-                                                       
LOG.warn(pe.getMessage());
-                                               }
-                                               
-                                               reverseKey = reverseUrl(url);
-                                               
config.setReverseKeyValue(reverseKey.replace("/", "_") + "_" + 
DigestUtils.shaHex(url) + "_" + timestamp);
-                                       }       
-                                       
-                                       if (epochFilename) {
-                                               outputFullPath = 
DumpFileUtil.createFileNameFromUrl(outputDir.getAbsolutePath(), reverseKey, 
url, timestamp, extensionName, !gzip);
-                                               outputRelativePath = 
outputFullPath.substring(0, outputFullPath.lastIndexOf(File.separator)-1);
-                                               filename = 
content.getMetadata().get(Metadata.DATE) + "." + extensionName;
-                                       }
-                                       else {
-                                               String md5Ofurl = 
DumpFileUtil.getUrlMD5(url);
-                                               String fullDir = 
DumpFileUtil.createTwoLevelsDirectory(outputDir.getAbsolutePath(), md5Ofurl, 
!gzip);
-                                               filename = 
DumpFileUtil.createFileName(md5Ofurl, baseName, extensionName);
-                                               outputFullPath = 
String.format("%s/%s", fullDir, filename);
-       
-                                               String [] fullPathLevels = 
fullDir.split(File.separator);
-                                               String firstLevelDirName = 
fullPathLevels[fullPathLevels.length-2]; 
-                                               String secondLevelDirName = 
fullPathLevels[fullPathLevels.length-1];
-                                               outputRelativePath = 
firstLevelDirName + secondLevelDirName;
-                                       }
-                                       
-                                       // Encode all filetypes if no mimetypes 
have been given
-                                       Boolean filter = (mimeTypes == null);
-                                       
-                                       String jsonData = "";
-                                       try {
-                                               String mimeType = new 
Tika().detect(content.getContent());
-                                               // Maps file to JSON-based 
structure
-                                               CommonCrawlFormat format = 
CommonCrawlFormatFactory.getCommonCrawlFormat("JACKSON", url, 
content.getContent(), metadata, nutchConfig, config);
-                                               jsonData = format.getJsonData();
-
-                                               collectStats(typeCounts, 
mimeType);
-                                               // collects statistics for the 
given mimetypes
-                                               if ((mimeType != null) && 
(mimeTypes != null) && Arrays.asList(mimeTypes).contains(mimeType)) {
-                                                       
collectStats(filteredCounts, mimeType);
-                                                       filter = true;
-                                               }
-                                       } catch (IOException ioe) { 
-                                               LOG.error("Fatal error in 
creating JSON data: " + ioe.getMessage());
-                                               return;
-                                       }
-
-                                       if (filter) {
-                                               byte[] byteData = 
serializeCBORData(jsonData);
-                                               
-                                               if (!gzip) {
-                                                       File outputFile = new 
File(outputFullPath);
-                                                       if 
(outputFile.exists()) {
-                                                               
LOG.info("Skipping writing: [" + outputFullPath + "]: file already exists");
-                                                       }
-                                                       else {
-                                                               
LOG.info("Writing: [" + outputFullPath + "]");
-                                                               
IOUtils.copy(new ByteArrayInputStream(byteData), new 
FileOutputStream(outputFile));
-                                                       }
-                                               }
-                                               else {
-                                                       if 
(fileList.contains(outputFullPath)) {
-                                                               
LOG.info("Skipping compressing: [" + outputFullPath + "]: file already exists");
-                                                       }
-                                                       else {
-                                                               
fileList.add(outputFullPath);
-                                                               
LOG.info("Compressing: [" + outputFullPath + "]");
-                                                               
//TarArchiveEntry tarEntry = new TarArchiveEntry(firstLevelDirName + 
File.separator + secondLevelDirName + File.separator + filename);
-                                                               TarArchiveEntry 
tarEntry = new TarArchiveEntry(outputRelativePath + File.separator + filename);
-                                                               
tarEntry.setSize(byteData.length);
-                                                               
tarOutput.putArchiveEntry(tarEntry);
-                                                               
tarOutput.write(byteData);
-                                                               
tarOutput.closeArchiveEntry();
-                                                       }
-                                               }
-                                       }
-                               }
-                               reader.close();
-                       } finally {
-                               fs.close();
-                       }
-               }
-               
-               if (gzip) {
-               closeStream();
-               }
-               
-               if (!typeCounts.isEmpty()) {
-                       LOG.info("CommonsCrawlDataDumper File Stats: " + 
DumpFileUtil.displayFileTypes(typeCounts, filteredCounts));
-               }
-       }
-       
-       private void closeStream() {
-               try {
-                       tarOutput.finish();
-                       
-               tarOutput.close();
-               gzipOutput.close();
-               bufOutput.close();
-               fileOutput.close();
-               } catch (IOException ioe) {
-                       LOG.warn("Error in closing stream: " + 
ioe.getMessage());
-               }
-       }
-       
-       private void constructNewStream(File outputDir) throws IOException {    
-               String archiveName = new 
SimpleDateFormat("yyyyMMddhhmm'.tar.gz'").format(new Date());
-               LOG.info("Creating a new gzip archive: " + archiveName);
-           fileOutput = new FileOutputStream(new File(outputDir + 
File.separator + archiveName));
-           bufOutput = new BufferedOutputStream(fileOutput);
-           gzipOutput = new GzipCompressorOutputStream(bufOutput);
-           tarOutput = new TarArchiveOutputStream(gzipOutput);
-           tarOutput.setLongFileMode(TarArchiveOutputStream.LONGFILE_GNU);
-       }
-       
-       /**
-        * Writes the CBOR "Self-Describe Tag" (value 55799, serialized as 
3-byte
-        * sequence of {@code 0xd9d9f7}) at the current position. This method 
must
-        * be used to write the CBOR magic number at the beginning of the 
document.
-        * Since version 2.5, <a
-        * href="https://github.com/FasterXML/jackson-dataformat-cbor";
-        * >jackson-dataformat-cbor</a> will support the {@code 
WRITE_TYPE_HEADER}
-        * feature to write that type tag at the beginning of the document.
-        * 
-        * @see <a href="https://tools.ietf.org/html/rfc7049#section-2.4.5";>RFC
-        *      7049</a>
-        * @param generator {@link CBORGenerator} object used to create a 
CBOR-encoded document.
-        * @throws IOException if any I/O error occurs.
-        */
-       private void writeMagicHeader(CBORGenerator generator) throws 
IOException {
-               // Writes self-describe CBOR
-               // https://tools.ietf.org/html/rfc7049#section-2.4.5
-               // It will be supported in jackson-cbor since 2.5
-               byte[] header = new byte[3];
-               header[0] = (byte) 0xd9;
-               header[1] = (byte) 0xd9;
-               header[2] = (byte) 0xf7;
-               generator.writeBytes(header, 0, header.length);
-       }
-       
-       private byte[] serializeCBORData(String jsonData) {
-               CBORFactory factory = new CBORFactory();
-               
-               CBORGenerator generator = null;
-               ByteArrayOutputStream stream = null;
-               
-               try {
-                       stream = new ByteArrayOutputStream();
-                       generator = factory.createGenerator(stream);
-                       // Writes CBOR tag
-                       writeMagicHeader(generator);
-                       generator.writeString(jsonData);
-                       generator.flush();
-                       stream.flush();
-                       
-                       return stream.toByteArray();
-                       
-               } catch (Exception e) {
-                       LOG.warn("CBOR encoding failed: " + e.getMessage());
-               } finally {
-                       try {
-                               generator.close();
-                               stream.close();
-                       } catch (IOException e) {
-                               // nothing to do
-                       }
-               }
-               
-               return null;
-       }
-
-       private void collectStats(Map<String, Integer> typeCounts, String 
mimeType) {
-               typeCounts.put(mimeType, typeCounts.containsKey(mimeType) ? 
typeCounts.get(mimeType) + 1 : 1);
-       }
-       
-       /**
-        * Gets the current date if the given timestamp is empty or null.
-        * @param timestamp the timestamp
-        * @return the current timestamp if the given one is null.
-        */
-       private String getDate(String timestamp) {
-               if (timestamp == null || timestamp.isEmpty()) {
-                       DateFormat dateFormat = new SimpleDateFormat("EEE, d 
MMM yyyy HH:mm:ss z");
-                       timestamp = dateFormat.format(new Date());
-               }
-               return timestamp;
-                       
-       }
-       
-       public static String reverseUrl(String urlString) {
-       URL url = null;
-               String reverseKey = null;
-               try {
-                       url = new URL(urlString);
-                       
-                       String[] hostPart = url.getHost().replace('.', 
'/').split("/");
-                       
-                       StringBuilder sb = new StringBuilder();
-                       sb.append(hostPart[hostPart.length-1]);
-                       for (int i = hostPart.length-2; i >= 0; i--) {
-                               sb.append("/" + hostPart[i]);
-                       }
-                       
-                       reverseKey = sb.toString();
-
-               } catch (MalformedURLException e) {
-                       LOG.error("Failed to parse URL: {}", urlString);
-               }
-               
-               return reverseKey;
+      reverseKey = sb.toString();
+
+    } catch (MalformedURLException e) {
+      LOG.error("Failed to parse URL: {}", urlString);
     }
+
+    return reverseKey;
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    Option helpOpt = new Option("h", "help", false, "show this help message.");
+    // argument options
+    Option outputOpt = OptionBuilder.withArgName("outputDir").hasArg()
+        .withDescription(
+            "output directory (which will be created) to host the CBOR data.")
+        .create("outputDir");
+    // WARC format
+    Option warcOpt = new Option("warc", "export to a WARC file");
+
+    Option segOpt = OptionBuilder.withArgName("segment").hasArgs()
+        .withDescription("the segment or directory containing segments to 
use").create("segment");
+    // create mimetype and gzip options
+    Option mimeOpt = OptionBuilder.isRequired(false).withArgName("mimetype")
+        .hasArgs().withDescription(
+            "an optional list of mimetypes to dump, excluding all others. 
Defaults to all.")
+        .create("mimetype");
+    Option gzipOpt = OptionBuilder.withArgName("gzip").hasArg(false)
+        .withDescription(
+            "an optional flag indicating whether to additionally gzip the 
data.")
+        .create("gzip");
+    Option keyPrefixOpt = OptionBuilder.withArgName("keyPrefix").hasArg(true)
+        .withDescription("an optional prefix for key in the output format.")
+        .create("keyPrefix");
+    Option simpleDateFormatOpt = OptionBuilder.withArgName("SimpleDateFormat")
+        .hasArg(false).withDescription(
+            "an optional format for timestamp in GMT epoch milliseconds.")
+        .create("SimpleDateFormat");
+    Option epochFilenameOpt = OptionBuilder.withArgName("epochFilename")
+        .hasArg(false)
+        .withDescription("an optional format for output filename.")
+        .create("epochFilename");
+    Option jsonArrayOpt = OptionBuilder.withArgName("jsonArray").hasArg(false)
+        .withDescription("an optional format for JSON output.")
+        .create("jsonArray");
+    Option reverseKeyOpt = 
OptionBuilder.withArgName("reverseKey").hasArg(false)
+        .withDescription("an optional format for key value in JSON output.")
+        .create("reverseKey");
+    Option extensionOpt = OptionBuilder.withArgName("extension").hasArg(true)
+        .withDescription("an optional file extension for output documents.")
+        .create("extension");
+    Option sizeOpt = OptionBuilder.withArgName("warcSize").hasArg(true)
+        .withType(Number.class)
+        .withDescription("an optional file size in bytes for the WARC file(s)")
+        .create("warcSize");
+
+    // create the options
+    Options options = new Options();
+    options.addOption(helpOpt);
+    options.addOption(outputOpt);
+    options.addOption(segOpt);
+    // create mimetypes and gzip options
+    options.addOption(warcOpt);
+    options.addOption(mimeOpt);
+    options.addOption(gzipOpt);
+    // create keyPrefix option
+    options.addOption(keyPrefixOpt);
+    // create simpleDataFormat option
+    options.addOption(simpleDateFormatOpt);
+    options.addOption(epochFilenameOpt);
+    options.addOption(jsonArrayOpt);
+    options.addOption(reverseKeyOpt);
+    options.addOption(extensionOpt);
+    options.addOption(sizeOpt);
+
+    CommandLineParser parser = new GnuParser();
+    try {
+      CommandLine line = parser.parse(options, args);
+      if (line.hasOption("help") || !line.hasOption("outputDir") || (!line
+          .hasOption("segment"))) {
+        HelpFormatter formatter = new HelpFormatter();
+        formatter
+            .printHelp(CommonCrawlDataDumper.class.getName(), options, true);
+        return 0;
+      }
+
+      File outputDir = new File(line.getOptionValue("outputDir"));
+      File segmentRootDir = new File(line.getOptionValue("segment"));
+      String[] mimeTypes = line.getOptionValues("mimetype");
+      boolean gzip = line.hasOption("gzip");
+      boolean epochFilename = line.hasOption("epochFilename");
+
+      String keyPrefix = line.getOptionValue("keyPrefix", "");
+      boolean simpleDateFormat = line.hasOption("SimpleDateFormat");
+      boolean jsonArray = line.hasOption("jsonArray");
+      boolean reverseKey = line.hasOption("reverseKey");
+      String extension = line.getOptionValue("extension", "");
+      boolean warc = line.hasOption("warc");
+      long warcSize = 0;
+
+      if (line.getParsedOptionValue("warcSize") != null) {
+        warcSize = (Long) line.getParsedOptionValue("warcSize");
+      }
+
+      CommonCrawlConfig config = new CommonCrawlConfig();
+      config.setKeyPrefix(keyPrefix);
+      config.setSimpleDateFormat(simpleDateFormat);
+      config.setJsonArray(jsonArray);
+      config.setReverseKey(reverseKey);
+      config.setCompressed(gzip);
+      config.setWarcSize(warcSize);
+      config.setOutputDir(line.getOptionValue("outputDir"));
+
+      if (!outputDir.exists()) {
+        LOG.warn("Output directory: [" + outputDir.getAbsolutePath()
+            + "]: does not exist, creating it.");
+        if (!outputDir.mkdirs())
+          throw new Exception(
+              "Unable to create: [" + outputDir.getAbsolutePath() + "]");
+      }
+
+      CommonCrawlDataDumper dumper = new CommonCrawlDataDumper(config);
+
+      dumper.dump(outputDir, segmentRootDir, gzip, mimeTypes, epochFilename,
+          extension, warc);
+
+    } catch (Exception e) {
+      LOG.error(CommonCrawlDataDumper.class.getName() + ": " + StringUtils
+          .stringifyException(e));
+      e.printStackTrace();
+      return -1;
+    }
+
+    return 0;
+  }
 }

Modified: nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormat.java
URL: 
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormat.java?rev=1704594&r1=1704593&r2=1704594&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormat.java 
(original)
+++ nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormat.java Tue Sep 
22 12:23:31 2015
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,22 +17,55 @@
 
 package org.apache.nutch.tools;
 
+import org.apache.nutch.metadata.Metadata;
+import org.apache.nutch.parse.ParseData;
+import org.apache.nutch.protocol.Content;
+
 import java.io.IOException;
 
 /**
  * Interface for all CommonCrawl formatter. It provides the signature for the
  * method used to get JSON data.
- * 
+ *
  * @author gtotaro
  *
  */
 public interface CommonCrawlFormat {
 
-       /**
-        * 
-        * @param mapAll If {@code true} maps all metdata on the JSON structure.
-        * @return the JSON data
-        */
-       //public String getJsonData(boolean mapAll) throws IOException;
-       public String getJsonData() throws IOException;
+  /**
+   *
+   * @param mapAll If {@code true} maps all metdata on the JSON structure.
+   * @return the JSON data
+   */
+  //public String getJsonData(boolean mapAll) throws IOException;
+  public String getJsonData() throws IOException;
+
+  /**
+   * Returns a string representation of the JSON structure of the URL content
+   *
+   * @param url
+   * @param content
+   * @param metadata
+   * @return
+   */
+  public String getJsonData(String url, Content content, Metadata metadata)
+      throws IOException;
+
+  /**
+   * Returns a string representation of the JSON structure of the URL content
+   * takes into account the parsed metadata about the URL
+   *
+   * @param url
+   * @param content
+   * @param metadata
+   * @return
+   */
+  public String getJsonData(String url, Content content, Metadata metadata,
+      ParseData parseData) throws IOException;
+
+  /**
+   * Optional method that could be implemented if the actual format needs some
+   * close procedure.
+   */
+  public abstract void close();
 }

Modified: 
nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormatFactory.java
URL: 
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormatFactory.java?rev=1704594&r1=1704593&r2=1704594&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormatFactory.java 
(original)
+++ nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormatFactory.java 
Tue Sep 22 12:23:31 2015
@@ -21,6 +21,7 @@ import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.nutch.metadata.Metadata;
+import org.apache.nutch.protocol.Content;
 
 /**
  * Factory class that creates new {@see CommonCrawlFormat} objects (a.k.a. 
formatter) that map crawled files to CommonCrawl format.   
@@ -38,8 +39,9 @@ public class CommonCrawlFormatFactory {
         * @param config the CommonCrawl output configuration.
         * @return the new {@see CommonCrawlFormat} object.
         * @throws IOException If any I/O error occurs.
+        * @deprecated
         */
-       public static CommonCrawlFormat getCommonCrawlFormat(String formatType, 
String url, byte[] content,     Metadata metadata, Configuration nutchConf, 
CommonCrawlConfig config) throws IOException {
+       public static CommonCrawlFormat getCommonCrawlFormat(String formatType, 
String url, Content content,    Metadata metadata, Configuration nutchConf, 
CommonCrawlConfig config) throws IOException {
                if (formatType == null) {
                        return null;
                }
@@ -56,4 +58,17 @@ public class CommonCrawlFormatFactory {
                
                return null;
        }
+
+       // The format should not depend on variable attributes, essentially this
+       // should be one for the full job
+       public static CommonCrawlFormat getCommonCrawlFormat(String formatType, 
Configuration nutchConf, CommonCrawlConfig config) throws IOException {
+               if (formatType.equalsIgnoreCase("WARC")) {
+                       return new CommonCrawlFormatWARC(nutchConf, config);
+               }
+
+               if (formatType.equalsIgnoreCase("JACKSON")) {
+                       return new CommonCrawlFormatJackson( nutchConf, config);
+               }
+               return null;
+       }
 }

Modified: 
nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormatJackson.java
URL: 
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormatJackson.java?rev=1704594&r1=1704593&r2=1704594&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormatJackson.java 
(original)
+++ nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormatJackson.java 
Tue Sep 22 12:23:31 2015
@@ -25,6 +25,7 @@ import org.apache.nutch.metadata.Metadat
 
 import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.nutch.protocol.Content;
 
 /**
  * This class provides methods to map crawled data on JSON using Jackson 
Streaming APIs. 
@@ -36,8 +37,18 @@ public class CommonCrawlFormatJackson ex
        
        private JsonGenerator generator;
 
+       public CommonCrawlFormatJackson(Configuration nutchConf,
+                       CommonCrawlConfig config) throws IOException {
+               super(null, null, null, nutchConf, config);
+
+               JsonFactory factory = new JsonFactory();
+               this.out = new ByteArrayOutputStream();
+               this.generator = factory.createGenerator(out);
+
+               this.generator.useDefaultPrettyPrinter(); // INDENTED OUTPUT
+       }
        
-       public CommonCrawlFormatJackson(String url, byte[] content, Metadata 
metadata, Configuration nutchConf, CommonCrawlConfig config) throws IOException 
{
+       public CommonCrawlFormatJackson(String url, Content content, Metadata 
metadata, Configuration nutchConf, CommonCrawlConfig config) throws IOException 
{
                super(url, content, metadata, nutchConf, config);
                
                JsonFactory factory = new JsonFactory();

Modified: 
nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormatJettinson.java
URL: 
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormatJettinson.java?rev=1704594&r1=1704593&r2=1704594&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormatJettinson.java 
(original)
+++ nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormatJettinson.java 
Tue Sep 22 12:23:31 2015
@@ -23,6 +23,7 @@ import java.util.Deque;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.nutch.metadata.Metadata;
+import org.apache.nutch.protocol.Content;
 import org.codehaus.jettison.json.JSONArray;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
@@ -37,7 +38,7 @@ public class CommonCrawlFormatJettinson
        
        private Deque<JSONArray> stackArrays;
 
-       public CommonCrawlFormatJettinson(String url, byte[] content, Metadata 
metadata, Configuration nutchConf, CommonCrawlConfig config) throws IOException 
{
+       public CommonCrawlFormatJettinson(String url, Content content, Metadata 
metadata, Configuration nutchConf, CommonCrawlConfig config) throws IOException 
{
                super(url, content, metadata, nutchConf, config);
                
                stackObjects = new ArrayDeque<JSONObject>();

Modified: 
nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormatSimple.java
URL: 
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormatSimple.java?rev=1704594&r1=1704593&r2=1704594&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormatSimple.java 
(original)
+++ nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormatSimple.java 
Tue Sep 22 12:23:31 2015
@@ -21,6 +21,7 @@ import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.nutch.metadata.Metadata;
+import org.apache.nutch.protocol.Content;
 
 /**
  * This class provides methods to map crawled data on JSON using a {@see 
StringBuilder} object. 
@@ -32,7 +33,7 @@ public class CommonCrawlFormatSimple ext
        
        private int tabCount;
        
-       public CommonCrawlFormatSimple(String url, byte[] content, Metadata 
metadata, Configuration nutchConf, CommonCrawlConfig config) throws IOException 
{
+       public CommonCrawlFormatSimple(String url, Content content, Metadata 
metadata, Configuration nutchConf, CommonCrawlConfig config) throws IOException 
{
                super(url, content, metadata, nutchConf, config);
                
                this.sb = new StringBuilder();

Added: nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormatWARC.java
URL: 
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormatWARC.java?rev=1704594&view=auto
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormatWARC.java 
(added)
+++ nutch/trunk/src/java/org/apache/nutch/tools/CommonCrawlFormatWARC.java Tue 
Sep 22 12:23:31 2015
@@ -0,0 +1,286 @@
+package org.apache.nutch.tools;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.text.ParseException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.ibm.icu.text.SimpleDateFormat;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.metadata.Metadata;
+import org.apache.nutch.parse.ParseData;
+
+import org.apache.nutch.parse.ParseSegment;
+import org.apache.nutch.protocol.Content;
+import org.archive.format.warc.WARCConstants;
+import org.archive.io.WriterPoolMember;
+import org.archive.io.warc.WARCRecordInfo;
+import org.archive.io.warc.WARCWriter;
+import org.archive.io.warc.WARCWriterPoolSettingsData;
+import org.archive.uid.UUIDGenerator;
+import org.archive.util.DateUtils;
+import org.archive.util.anvl.ANVLRecord;
+
+public class CommonCrawlFormatWARC extends AbstractCommonCrawlFormat {
+
+  public static final String MAX_WARC_FILE_SIZE = "warc.file.size.max";
+  public static final String TEMPLATE = "${prefix}-${timestamp17}-${serialno}";
+
+  private static final AtomicInteger SERIALNO = new AtomicInteger();
+  private final static UUIDGenerator GENERATOR = new UUIDGenerator();
+
+  private String outputDir = null;
+  private ByteArrayOutputStream out;
+  private WARCWriter writer;
+  private ParseData parseData;
+
+  public CommonCrawlFormatWARC(Configuration nutchConf,
+      CommonCrawlConfig config) throws IOException {
+    super(null, null, null, nutchConf, config);
+
+    this.out = new ByteArrayOutputStream();
+
+    ANVLRecord info = WARCUtils.getWARCInfoContent(nutchConf);
+    List<String> md = Collections.singletonList(info.toString());
+
+    this.outputDir = config.getOutputDir();
+
+    if (null == outputDir) {
+      String message = "Missing output directory configuration: " + outputDir;
+
+      throw new RuntimeException(message);
+    }
+
+    File file = new File(outputDir);
+
+    long maxSize = WARCConstants.DEFAULT_MAX_WARC_FILE_SIZE;
+
+    if (config.getWarcSize() > 0) {
+      maxSize = config.getWarcSize();
+    }
+
+    WARCWriterPoolSettingsData settings = new WARCWriterPoolSettingsData(
+        WriterPoolMember.DEFAULT_PREFIX, TEMPLATE, maxSize,
+        config.isCompressed(), Arrays.asList(new File[] { file }), md,
+        new UUIDGenerator());
+
+    writer = new WARCWriter(SERIALNO, settings);
+  }
+
+  public CommonCrawlFormatWARC(String url, Content content, Metadata metadata,
+      Configuration nutchConf, CommonCrawlConfig config, ParseData parseData)
+      throws IOException {
+    super(url, content, metadata, nutchConf, config);
+
+    this.out = new ByteArrayOutputStream();
+    this.parseData = parseData;
+
+    ANVLRecord info = WARCUtils.getWARCInfoContent(conf);
+    List<String> md = Collections.singletonList(info.toString());
+
+    this.outputDir = config.getOutputDir();
+
+    if (null == outputDir) {
+      String message = "Missing output directory configuration: " + outputDir;
+
+      throw new RuntimeException(message);
+    }
+
+    File file = new File(outputDir);
+
+    long maxSize = WARCConstants.DEFAULT_MAX_WARC_FILE_SIZE;
+
+    if (config.getWarcSize() > 0) {
+      maxSize = config.getWarcSize();
+    }
+
+    WARCWriterPoolSettingsData settings = new WARCWriterPoolSettingsData(
+        WriterPoolMember.DEFAULT_PREFIX, TEMPLATE, maxSize,
+        config.isCompressed(), Arrays.asList(new File[] { file }), md,
+        new UUIDGenerator());
+
+    writer = new WARCWriter(SERIALNO, settings);
+  }
+
+  public String getJsonData(String url, Content content, Metadata metadata,
+      ParseData parseData) throws IOException {
+    this.url = url;
+    this.content = content;
+    this.metadata = metadata;
+    this.parseData = parseData;
+
+    return this.getJsonData();
+  }
+
+  @Override
+  public String getJsonData() throws IOException {
+
+    long position = writer.getPosition();
+
+    try {
+      // See if we need to open a new file because we've exceeded maxBytes
+
+      // checkSize will open a new file if we exceeded the maxBytes setting
+      writer.checkSize();
+
+      if (writer.getPosition() != position) {
+        // We just closed the file because it was larger than maxBytes.
+        position = writer.getPosition();
+      }
+
+      // response record
+      URI id = writeResponse();
+
+      if (StringUtils.isNotBlank(metadata.get("_request_"))) {
+        // write the request method if any request info is found
+        writeRequest(id);
+      }
+    } catch (IOException e) {
+      // Launch the corresponding IO error
+      throw e;
+    } catch (ParseException e) {
+      // do nothing, as we can't establish a valid WARC-Date for this record
+      // lets skip it altogether
+      LOG.error("Can't get a valid date from: {}", url);
+    }
+
+    return null;
+  }
+
+  protected URI writeResponse() throws IOException, ParseException {
+    WARCRecordInfo record = new WARCRecordInfo();
+
+    record.setType(WARCConstants.WARCRecordType.response);
+    record.setUrl(getUrl());
+
+    String fetchTime;
+
+    record.setCreate14DigitDate(DateUtils
+        .getLog14Date(Long.parseLong(metadata.get("nutch.fetch.time"))));
+    record.setMimetype(WARCConstants.HTTP_RESPONSE_MIMETYPE);
+    record.setRecordId(GENERATOR.getRecordID());
+
+    String IP = getResponseAddress();
+
+    if (StringUtils.isNotBlank(IP))
+      record.addExtraHeader(WARCConstants.HEADER_KEY_IP, IP);
+
+    if (ParseSegment.isTruncated(content))
+      record.addExtraHeader(WARCConstants.HEADER_KEY_TRUNCATED, "unspecified");
+
+    ByteArrayOutputStream output = new ByteArrayOutputStream();
+
+    String httpHeaders = metadata.get("_response.headers_");
+
+    if (StringUtils.isNotBlank(httpHeaders)) {
+      output.write(httpHeaders.getBytes());
+    } else {
+      // change the record type to resource as we not have information about
+      // the headers
+      record.setType(WARCConstants.WARCRecordType.resource);
+      record.setMimetype(content.getContentType());
+    }
+
+    output.write(getResponseContent().getBytes());
+
+    record.setContentLength(output.size());
+    record.setContentStream(new ByteArrayInputStream(output.toByteArray()));
+
+    if (output.size() > 0) {
+      // avoid generating a 0 sized record, as the webarchive library will
+      // complain about it
+      writer.writeRecord(record);
+    }
+
+    return record.getRecordId();
+  }
+
+  protected URI writeRequest(URI id) throws IOException, ParseException {
+    WARCRecordInfo record = new WARCRecordInfo();
+
+    record.setType(WARCConstants.WARCRecordType.request);
+    record.setUrl(getUrl());
+    record.setCreate14DigitDate(DateUtils
+        .getLog14Date(Long.parseLong(metadata.get("nutch.fetch.time"))));
+    record.setMimetype(WARCConstants.HTTP_REQUEST_MIMETYPE);
+    record.setRecordId(GENERATOR.getRecordID());
+
+    if (id != null) {
+      ANVLRecord headers = new ANVLRecord();
+      headers.addLabelValue(WARCConstants.HEADER_KEY_CONCURRENT_TO,
+          '<' + id.toString() + '>');
+      record.setExtraHeaders(headers);
+    }
+
+    ByteArrayOutputStream output = new ByteArrayOutputStream();
+
+    output.write(metadata.get("_request_").getBytes());
+    record.setContentLength(output.size());
+    record.setContentStream(new ByteArrayInputStream(output.toByteArray()));
+
+    writer.writeRecord(record);
+
+    return record.getRecordId();
+  }
+
+  @Override
+  protected String generateJson() throws IOException {
+    return null;
+  }
+
+  @Override
+  protected void writeKeyValue(String key, String value) throws IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  protected void writeKeyNull(String key) throws IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  protected void startArray(String key, boolean nested, boolean newline)
+      throws IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  protected void closeArray(String key, boolean nested, boolean newline)
+      throws IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  protected void writeArrayValue(String value) throws IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  protected void startObject(String key) throws IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  protected void closeObject(String key) throws IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public void close() {
+    if (writer != null)
+      try {
+        writer.close();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+  }
+}

Added: nutch/trunk/src/java/org/apache/nutch/tools/WARCUtils.java
URL: 
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/tools/WARCUtils.java?rev=1704594&view=auto
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/tools/WARCUtils.java (added)
+++ nutch/trunk/src/java/org/apache/nutch/tools/WARCUtils.java Tue Sep 22 
12:23:31 2015
@@ -0,0 +1,154 @@
+package org.apache.nutch.tools;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.indexer.NutchDocument;
+import org.apache.nutch.util.StringUtil;
+import org.archive.format.http.HttpHeaders;
+import org.archive.format.warc.WARCConstants;
+import org.archive.io.warc.WARCRecordInfo;
+import org.archive.uid.UUIDGenerator;
+import org.archive.util.DateUtils;
+import org.archive.util.anvl.ANVLRecord;
+
+public class WARCUtils {
+    public final static String SOFTWARE = "software";
+    public final static String HTTP_HEADER_FROM = "http-header-from";
+    public final static String HTTP_HEADER_USER_AGENT = 
"http-header-user-agent";
+    public final static String HOSTNAME = "hostname";
+    public final static String ROBOTS = "robots";
+    public final static String OPERATOR = "operator";
+    public final static String FORMAT = "format";
+    public final static String CONFORMS_TO = "conformsTo";
+    public final static String IP = "ip";
+    public final static UUIDGenerator generator = new UUIDGenerator();
+
+    public static final ANVLRecord getWARCInfoContent(Configuration conf) {
+        ANVLRecord record = new ANVLRecord();
+
+        // informative headers
+        record.addLabelValue(FORMAT, "WARC File Format 1.0");
+        record.addLabelValue(CONFORMS_TO, 
"http://bibnum.bnf.fr/WARC/WARC_ISO_28500_version1_latestdraft.pdf";);
+
+        record.addLabelValue(SOFTWARE, conf.get("http.agent.name", ""));
+        record.addLabelValue(HTTP_HEADER_USER_AGENT,
+                getAgentString(conf.get("http.agent.name", ""),
+                        conf.get("http.agent.version", ""),
+                        conf.get("http.agent.description", ""),
+                        conf.get("http.agent.url", ""),
+                        conf.get("http.agent.email", "")));
+        record.addLabelValue(HTTP_HEADER_FROM,
+                conf.get("http.agent.email", ""));
+
+        try {
+            record.addLabelValue(HOSTNAME, getHostname(conf));
+            record.addLabelValue(IP, getIPAddress(conf));
+        } catch (UnknownHostException ignored) {
+            // do nothing as this fields are optional
+        }
+
+        record.addLabelValue(ROBOTS, "classic"); // TODO Make configurable?
+        record.addLabelValue(OPERATOR, conf.get("http.agent.email", ""));
+
+        return record;
+    }
+
+    public static final String getHostname(Configuration conf)
+            throws UnknownHostException {
+
+        return StringUtil.isEmpty(conf.get("http.agent.host", "")) ?
+                InetAddress.getLocalHost().getHostName() :
+                conf.get("http.agent.host");
+    }
+
+    public static final String getIPAddress(Configuration conf)
+            throws UnknownHostException {
+
+        return InetAddress.getLocalHost().getHostAddress();
+    }
+
+    public static final byte[] toByteArray(HttpHeaders headers)
+            throws IOException {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        headers.write(out);
+
+        return out.toByteArray();
+    }
+
+    public static final String getAgentString(String name, String version,
+            String description, String URL, String email) {
+
+        StringBuffer buf = new StringBuffer();
+
+        buf.append(name);
+
+        if (version != null) {
+            buf.append("/").append(version);
+        }
+
+        if (((description != null) && (description.length() != 0)) || (
+                (email != null) && (email.length() != 0)) || ((URL != null) && 
(
+                URL.length() != 0))) {
+            buf.append(" (");
+
+            if ((description != null) && (description.length() != 0)) {
+                buf.append(description);
+                if ((URL != null) || (email != null))
+                    buf.append("; ");
+            }
+
+            if ((URL != null) && (URL.length() != 0)) {
+                buf.append(URL);
+                if (email != null)
+                    buf.append("; ");
+            }
+
+            if ((email != null) && (email.length() != 0))
+                buf.append(email);
+
+            buf.append(")");
+        }
+
+        return buf.toString();
+    }
+
+    public static final WARCRecordInfo docToMetadata(NutchDocument doc)
+            throws UnsupportedEncodingException {
+        WARCRecordInfo record = new WARCRecordInfo();
+
+        record.setType(WARCConstants.WARCRecordType.metadata);
+        record.setUrl((String) doc.getFieldValue("id"));
+        record.setCreate14DigitDate(
+                DateUtils.get14DigitDate((Date) doc.getFieldValue("tstamp")));
+        record.setMimetype("application/warc-fields");
+        record.setRecordId(generator.getRecordID());
+
+        // metadata
+        ANVLRecord metadata = new ANVLRecord();
+
+        for (String field : doc.getFieldNames()) {
+            List<Object> values = doc.getField(field).getValues();
+            for (Object value : values) {
+                if (value instanceof Date) {
+                    metadata.addLabelValue(field, DateUtils.get14DigitDate());
+                } else {
+                    metadata.addLabelValue(field, (String) value);
+                }
+            }
+        }
+
+        record.setContentLength(metadata.getLength());
+        record.setContentStream(
+                new ByteArrayInputStream(metadata.getUTF8Bytes()));
+
+        return record;
+    }
+}

Modified: 
nutch/trunk/src/plugin/protocol-http/src/java/org/apache/nutch/protocol/http/HttpResponse.java
URL: 
http://svn.apache.org/viewvc/nutch/trunk/src/plugin/protocol-http/src/java/org/apache/nutch/protocol/http/HttpResponse.java?rev=1704594&r1=1704593&r2=1704594&view=diff
==============================================================================
--- 
nutch/trunk/src/plugin/protocol-http/src/java/org/apache/nutch/protocol/http/HttpResponse.java
 (original)
+++ 
nutch/trunk/src/plugin/protocol-http/src/java/org/apache/nutch/protocol/http/HttpResponse.java
 Tue Sep 22 12:23:31 2015
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -43,7 +43,9 @@ import org.apache.nutch.protocol.Protoco
 import org.apache.nutch.protocol.http.api.HttpBase;
 import org.apache.nutch.protocol.http.api.HttpException;
 
-/** An HTTP response. */
+/**
+ * An HTTP response.
+ */
 public class HttpResponse implements Response {
 
   private Configuration conf;
@@ -54,6 +56,8 @@ public class HttpResponse implements Res
   private byte[] content;
   private int code;
   private Metadata headers = new SpellCheckedMetadata();
+  // used for storing the http headers verbatim
+  private StringBuffer httpHeaders;
 
   protected enum Scheme {
     HTTP, HTTPS,
@@ -61,7 +65,7 @@ public class HttpResponse implements Res
 
   /**
    * Default public constructor.
-   * 
+   *
    * @param http
    * @param url
    * @param datum
@@ -125,24 +129,24 @@ public class HttpResponse implements Res
       if (scheme == Scheme.HTTPS) {
         SSLSocketFactory factory = (SSLSocketFactory) SSLSocketFactory
             .getDefault();
-        SSLSocket sslsocket = (SSLSocket) factory.createSocket(socket,
-            sockHost, sockPort, true);
+        SSLSocket sslsocket = (SSLSocket) factory
+            .createSocket(socket, sockHost, sockPort, true);
         sslsocket.setUseClientMode(true);
 
         // Get the protocols and ciphers supported by this JVM
-        Set<String> protocols = new HashSet<String>(Arrays.asList(sslsocket
-            .getSupportedProtocols()));
-        Set<String> ciphers = new HashSet<String>(Arrays.asList(sslsocket
-            .getSupportedCipherSuites()));
+        Set<String> protocols = new HashSet<String>(
+            Arrays.asList(sslsocket.getSupportedProtocols()));
+        Set<String> ciphers = new HashSet<String>(
+            Arrays.asList(sslsocket.getSupportedCipherSuites()));
 
         // Intersect with preferred protocols and ciphers
         protocols.retainAll(http.getTlsPreferredProtocols());
         ciphers.retainAll(http.getTlsPreferredCipherSuites());
 
-        sslsocket.setEnabledProtocols(protocols.toArray(new String[protocols
-            .size()]));
-        sslsocket.setEnabledCipherSuites(ciphers.toArray(new String[ciphers
-            .size()]));
+        sslsocket.setEnabledProtocols(
+            protocols.toArray(new String[protocols.size()]));
+        sslsocket.setEnabledCipherSuites(
+            ciphers.toArray(new String[ciphers.size()]));
 
         sslsocket.startHandshake();
         socket = sslsocket;
@@ -193,34 +197,54 @@ public class HttpResponse implements Res
       reqStr.append("\r\n");
 
       if (http.isIfModifiedSinceEnabled() && datum.getModifiedTime() > 0) {
-        reqStr.append("If-Modified-Since: "
-            + HttpDateFormat.toString(datum.getModifiedTime()));
+        reqStr.append("If-Modified-Since: " + HttpDateFormat
+            .toString(datum.getModifiedTime()));
         reqStr.append("\r\n");
       }
       reqStr.append("\r\n");
 
+      // store the request in the metadata?
+      if (conf.getBoolean("store.http.request", false) == true) {
+        headers.add("_request_", reqStr.toString());
+      }
+
       byte[] reqBytes = reqStr.toString().getBytes();
 
       req.write(reqBytes);
       req.flush();
 
       PushbackInputStream in = // process response
-      new PushbackInputStream(new BufferedInputStream(socket.getInputStream(),
-          Http.BUFFER_SIZE), Http.BUFFER_SIZE);
+          new PushbackInputStream(
+              new BufferedInputStream(socket.getInputStream(),
+                  Http.BUFFER_SIZE), Http.BUFFER_SIZE);
 
       StringBuffer line = new StringBuffer();
 
+      // store the http headers verbatim
+      if (conf.getBoolean("store.http.headers", false) == true) {
+        httpHeaders = new StringBuffer();
+      }
+
+      headers.add("nutch.fetch.time", 
Long.toString(System.currentTimeMillis()));
+
       boolean haveSeenNonContinueStatus = false;
       while (!haveSeenNonContinueStatus) {
         // parse status code line
         this.code = parseStatusLine(in, line);
+        if (httpHeaders != null)
+          httpHeaders.append(line).append("\n");
         // parse headers
-        parseHeaders(in, line);
+        parseHeaders(in, line, httpHeaders);
         haveSeenNonContinueStatus = code != 100; // 100 is "Continue"
       }
+
+      if (httpHeaders != null) {
+        headers.add("_response.headers_", httpHeaders.toString());
+      }
+
       String transferEncoding = getHeader(Response.TRANSFER_ENCODING);
-      if (transferEncoding != null
-          && "chunked".equalsIgnoreCase(transferEncoding.trim())) {
+      if (transferEncoding != null && "chunked"
+          .equalsIgnoreCase(transferEncoding.trim())) {
         readChunkedContent(in, line);
       } else {
         readPlainContent(in);
@@ -274,8 +298,8 @@ public class HttpResponse implements Res
    * -------------------------
    */
 
-  private void readPlainContent(InputStream in) throws HttpException,
-      IOException {
+  private void readPlainContent(InputStream in)
+      throws HttpException, IOException {
 
     int contentLength = Integer.MAX_VALUE; // get content length
     String contentLengthString = headers.get(Response.CONTENT_LENGTH);
@@ -288,9 +312,10 @@ public class HttpResponse implements Res
         throw new HttpException("bad content length: " + contentLengthString);
       }
     }
-    if (http.getMaxContent() >= 0 && contentLength > http.getMaxContent()) // 
limit
-                                                                           // 
download
-                                                                           // 
size
+    if (http.getMaxContent() >= 0 && contentLength > http
+        .getMaxContent()) // limit
+      // download
+      // size
       contentLength = http.getMaxContent();
 
     ByteArrayOutputStream out = new ByteArrayOutputStream(Http.BUFFER_SIZE);
@@ -323,7 +348,6 @@ public class HttpResponse implements Res
   }
 
   /**
-   * 
    * @param in
    * @param line
    * @throws HttpException
@@ -368,16 +392,17 @@ public class HttpResponse implements Res
         break;
       }
 
-      if (http.getMaxContent() >= 0
-          && (contentBytesRead + chunkLen) > http.getMaxContent())
+      if (http.getMaxContent() >= 0 && (contentBytesRead + chunkLen) > http
+          .getMaxContent())
         chunkLen = http.getMaxContent() - contentBytesRead;
 
       // read one chunk
       int chunkBytesRead = 0;
       while (chunkBytesRead < chunkLen) {
 
-        int toRead = (chunkLen - chunkBytesRead) < Http.BUFFER_SIZE ? 
(chunkLen - chunkBytesRead)
-            : Http.BUFFER_SIZE;
+        int toRead = (chunkLen - chunkBytesRead) < Http.BUFFER_SIZE ?
+            (chunkLen - chunkBytesRead) :
+            Http.BUFFER_SIZE;
         int len = in.read(bytes, 0, toRead);
 
         if (len == -1)
@@ -405,7 +430,7 @@ public class HttpResponse implements Res
     }
 
     content = out.toByteArray();
-    parseHeaders(in, line);
+    parseHeaders(in, line, null);
 
   }
 
@@ -425,15 +450,15 @@ public class HttpResponse implements Res
     try {
       code = Integer.parseInt(line.substring(codeStart + 1, codeEnd));
     } catch (NumberFormatException e) {
-      throw new HttpException("bad status line '" + line + "': "
-          + e.getMessage(), e);
+      throw new HttpException(
+          "bad status line '" + line + "': " + e.getMessage(), e);
     }
 
     return code;
   }
 
-  private void processHeaderLine(StringBuffer line) throws IOException,
-      HttpException {
+  private void processHeaderLine(StringBuffer line)
+      throws IOException, HttpException {
 
     int colonIndex = line.indexOf(":"); // key is up to colon
     if (colonIndex == -1) {
@@ -459,16 +484,19 @@ public class HttpResponse implements Res
   }
 
   // Adds headers to our headers Metadata
-  private void parseHeaders(PushbackInputStream in, StringBuffer line)
-      throws IOException, HttpException {
+  private void parseHeaders(PushbackInputStream in, StringBuffer line,
+      StringBuffer httpHeaders) throws IOException, HttpException {
 
     while (readLine(in, line, true) != 0) {
 
+      if (httpHeaders != null)
+        httpHeaders.append(line).append("\n");
+
       // handle HTTP responses with missing blank line after headers
       int pos;
-      if (((pos = line.indexOf("<!DOCTYPE")) != -1)
-          || ((pos = line.indexOf("<HTML")) != -1)
-          || ((pos = line.indexOf("<html")) != -1)) {
+      if (((pos = line.indexOf("<!DOCTYPE")) != -1) || (
+          (pos = line.indexOf("<HTML")) != -1) || ((pos = 
line.indexOf("<html"))
+          != -1)) {
 
         in.unread(line.substring(pos).getBytes("UTF-8"));
         line.setLength(pos);
@@ -526,4 +554,4 @@ public class HttpResponse implements Res
     return value;
   }
 
-}
+}
\ No newline at end of file



Reply via email to