http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/tools/AbstractCommonCrawlFormat.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/tools/AbstractCommonCrawlFormat.java b/nutch-core/src/main/java/org/apache/nutch/tools/AbstractCommonCrawlFormat.java new file mode 100644 index 0000000..1b425c4 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/tools/AbstractCommonCrawlFormat.java @@ -0,0 +1,393 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.tools; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.InetAddress; +import java.net.URLEncoder; +import java.net.UnknownHostException; +import java.text.ParseException; +import java.util.List; + +import org.apache.commons.httpclient.URIException; +import org.apache.commons.httpclient.util.URIUtil; +import org.apache.commons.lang.NotImplementedException; +import org.apache.hadoop.conf.Configuration; +import org.apache.nutch.metadata.Metadata; +import org.apache.nutch.parse.ParseData; +import org.apache.nutch.protocol.Content; +import org.apache.nutch.util.URLUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.ibm.icu.text.SimpleDateFormat; + +/** + * Abstract class that implements {@see CommonCrawlFormat} interface. + * + */ +public abstract class AbstractCommonCrawlFormat implements CommonCrawlFormat { + protected static final Logger LOG = LoggerFactory.getLogger(AbstractCommonCrawlFormat.class.getName()); + + protected String url; + + protected Content content; + + protected Metadata metadata; + + protected Configuration conf; + + protected String keyPrefix; + + protected boolean simpleDateFormat; + + protected boolean jsonArray; + + protected boolean reverseKey; + + protected String reverseKeyValue; + + protected List<String> inLinks; + + public AbstractCommonCrawlFormat(String url, Content content, Metadata metadata, Configuration nutchConf, CommonCrawlConfig config) throws IOException { + this.url = url; + this.content = content; + this.metadata = metadata; + this.conf = nutchConf; + + this.keyPrefix = config.getKeyPrefix(); + this.simpleDateFormat = config.getSimpleDateFormat(); + this.jsonArray = config.getJsonArray(); + this.reverseKey = config.getReverseKey(); + this.reverseKeyValue = config.getReverseKeyValue(); + } + + public String getJsonData(String url, Content content, Metadata metadata) + throws IOException { + this.url = url; + this.content = content; + this.metadata = metadata; + + return this.getJsonData(); + } + + public String getJsonData(String url, Content content, Metadata metadata, + ParseData parseData) throws IOException { + + // override of this is required in the actual formats + throw new NotImplementedException(); + } + + @Override + public String getJsonData() throws IOException { + try { + startObject(null); + + // url + writeKeyValue("url", getUrl()); + + // timestamp + writeKeyValue("timestamp", getTimestamp()); + + // request + startObject("request"); + writeKeyValue("method", getMethod()); + startObject("client"); + writeKeyValue("hostname", getRequestHostName()); + writeKeyValue("address", getRequestHostAddress()); + writeKeyValue("software", getRequestSoftware()); + writeKeyValue("robots", getRequestRobots()); + startObject("contact"); + writeKeyValue("name", getRequestContactName()); + writeKeyValue("email", getRequestContactEmail()); + closeObject("contact"); + closeObject("client"); + // start request headers + startHeaders("headers", false, true); + writeKeyValueWrapper("Accept", getRequestAccept()); + writeKeyValueWrapper("Accept-Encoding", getRequestAcceptEncoding()); + writeKeyValueWrapper("Accept-Language", getRequestAcceptLanguage()); + writeKeyValueWrapper("User-Agent", getRequestUserAgent()); + //closeObject("headers"); + closeHeaders("headers", false, true); + writeKeyNull("body"); + closeObject("request"); + + // response + startObject("response"); + writeKeyValue("status", getResponseStatus()); + startObject("server"); + writeKeyValue("hostname", getResponseHostName()); + writeKeyValue("address", getResponseAddress()); + closeObject("server"); + // start response headers + startHeaders("headers", false, true); + writeKeyValueWrapper("Content-Encoding", getResponseContentEncoding()); + writeKeyValueWrapper("Content-Type", getResponseContentType()); + writeKeyValueWrapper("Date", getResponseDate()); + writeKeyValueWrapper("Server", getResponseServer()); + for (String name : metadata.names()) { + if (name.equalsIgnoreCase("Content-Encoding") || name.equalsIgnoreCase("Content-Type") || name.equalsIgnoreCase("Date") || name.equalsIgnoreCase("Server")) { + continue; + } + writeKeyValueWrapper(name, metadata.get(name)); + } + closeHeaders("headers", false, true); + writeKeyValue("body", getResponseContent()); + closeObject("response"); + + // key + if (!this.keyPrefix.isEmpty()) { + this.keyPrefix += "-"; + } + writeKeyValue("key", this.keyPrefix + getKey()); + + // imported + writeKeyValue("imported", getImported()); + + if (getInLinks() != null){ + startArray("inlinks", false, true); + for (String link : getInLinks()) { + writeArrayValue(link); + } + closeArray("inlinks", false, true); + } + closeObject(null); + + return generateJson(); + + } catch (IOException ioe) { + LOG.warn("Error in processing file " + url + ": " + ioe.getMessage()); + throw new IOException("Error in generating JSON:" + ioe.getMessage()); + } + } + + // abstract methods + + protected abstract void writeKeyValue(String key, String value) throws IOException; + + protected abstract void writeKeyNull(String key) throws IOException; + + protected abstract void startArray(String key, boolean nested, boolean newline) throws IOException; + + protected abstract void closeArray(String key, boolean nested, boolean newline) throws IOException; + + protected abstract void writeArrayValue(String value) throws IOException; + + protected abstract void startObject(String key) throws IOException; + + protected abstract void closeObject(String key) throws IOException; + + protected abstract String generateJson() throws IOException; + + // getters + + protected String getUrl() { + try { + return URIUtil.encodePath(url); + } catch (URIException e) { + LOG.error("Can't encode URL " + url); + } + + return url; + } + + protected String getTimestamp() { + if (this.simpleDateFormat) { + String timestamp = null; + try { + long epoch = new SimpleDateFormat("EEE, d MMM yyyy HH:mm:ss z").parse(ifNullString(metadata.get(Metadata.LAST_MODIFIED))).getTime(); + timestamp = String.valueOf(epoch); + } catch (ParseException pe) { + LOG.warn(pe.getMessage()); + } + return timestamp; + } else { + return ifNullString(metadata.get(Metadata.LAST_MODIFIED)); + } + } + + protected String getMethod() { + return new String("GET"); + } + + protected String getRequestHostName() { + String hostName = ""; + try { + hostName = InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException uhe) { + + } + return hostName; + } + + protected String getRequestHostAddress() { + String hostAddress = ""; + try { + hostAddress = InetAddress.getLocalHost().getHostAddress(); + } catch (UnknownHostException uhe) { + + } + return hostAddress; + } + + protected String getRequestSoftware() { + return conf.get("http.agent.version", ""); + } + + protected String getRequestRobots() { + return new String("CLASSIC"); + } + + protected String getRequestContactName() { + return conf.get("http.agent.name", ""); + } + + protected String getRequestContactEmail() { + return conf.get("http.agent.email", ""); + } + + protected String getRequestAccept() { + return conf.get("http.accept", ""); + } + + protected String getRequestAcceptEncoding() { + return new String(""); // TODO + } + + protected String getRequestAcceptLanguage() { + return conf.get("http.accept.language", ""); + } + + protected String getRequestUserAgent() { + return conf.get("http.robots.agents", ""); + } + + protected String getResponseStatus() { + return ifNullString(metadata.get("status")); + } + + protected String getResponseHostName() { + return URLUtil.getHost(url); + } + + protected String getResponseAddress() { + return ifNullString(metadata.get("_ip_")); + } + + protected String getResponseContentEncoding() { + return ifNullString(metadata.get("Content-Encoding")); + } + + protected String getResponseContentType() { + return ifNullString(metadata.get("Content-Type")); + } + + public List<String> getInLinks() { + return inLinks; + } + + public void setInLinks(List<String> inLinks) { + this.inLinks = inLinks; + } + + protected String getResponseDate() { + if (this.simpleDateFormat) { + String timestamp = null; + try { + long epoch = new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss z").parse(ifNullString(metadata.get("Date"))).getTime(); + timestamp = String.valueOf(epoch); + } catch (ParseException pe) { + LOG.warn(pe.getMessage()); + } + return timestamp; + } else { + return ifNullString(metadata.get("Date")); + } + } + + protected String getResponseServer() { + return ifNullString(metadata.get("Server")); + } + + protected String getResponseContent() { + return new String(content.getContent()); + } + + protected String getKey() { + if (this.reverseKey) { + return this.reverseKeyValue; + } + else { + return url; + } + } + + protected String getImported() { + if (this.simpleDateFormat) { + String timestamp = null; + try { + long epoch = new SimpleDateFormat("EEE, d MMM yyyy HH:mm:ss z").parse(ifNullString(metadata.get("Date"))).getTime(); + timestamp = String.valueOf(epoch); + } catch (ParseException pe) { + LOG.warn(pe.getMessage()); + } + return timestamp; + } else { + return ifNullString(metadata.get("Date")); + } + } + + private static String ifNullString(String value) { + return (value != null) ? value : ""; + } + + private void startHeaders(String key, boolean nested, boolean newline) throws IOException { + if (this.jsonArray) { + startArray(key, nested, newline); + } + else { + startObject(key); + } + } + + private void closeHeaders(String key, boolean nested, boolean newline) throws IOException { + if (this.jsonArray) { + closeArray(key, nested, newline); + } + else { + closeObject(key); + } + } + + private void writeKeyValueWrapper(String key, String value) throws IOException { + if (this.jsonArray) { + startArray(null, true, false); + writeArrayValue(key); + writeArrayValue(value); + closeArray(null, true, false); + } + else { + writeKeyValue(key, value); + } + } + + @Override + public void close() {} +}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/tools/Benchmark.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/tools/Benchmark.java b/nutch-core/src/main/java/org/apache/nutch/tools/Benchmark.java new file mode 100755 index 0000000..ba42745 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/tools/Benchmark.java @@ -0,0 +1,284 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.tools; + +import java.io.OutputStream; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.util.ToolRunner; +import org.apache.nutch.crawl.CrawlDb; +import org.apache.nutch.crawl.CrawlDbReader; +import org.apache.nutch.crawl.Generator; +import org.apache.nutch.crawl.Injector; +import org.apache.nutch.crawl.LinkDb; +import org.apache.nutch.fetcher.Fetcher; +import org.apache.nutch.parse.ParseSegment; +import org.apache.nutch.util.NutchConfiguration; +import org.apache.nutch.util.NutchJob; + +public class Benchmark extends Configured implements Tool { + private static final Log LOG = LogFactory.getLog(Benchmark.class); + + public static void main(String[] args) throws Exception { + Configuration conf = NutchConfiguration.create(); + int res = ToolRunner.run(conf, new Benchmark(), args); + System.exit(res); + } + + @SuppressWarnings("unused") + private static String getDate() { + return new SimpleDateFormat("yyyyMMddHHmmss").format(new Date(System + .currentTimeMillis())); + } + + private void createSeeds(FileSystem fs, Path seedsDir, int count) + throws Exception { + OutputStream os = fs.create(new Path(seedsDir, "seeds")); + for (int i = 0; i < count; i++) { + String url = "http://www.test-" + i + ".com/\r\n"; + os.write(url.getBytes()); + } + os.flush(); + os.close(); + } + + public static final class BenchmarkResults { + Map<String, Map<String, Long>> timings = new HashMap<String, Map<String, Long>>(); + List<String> runs = new ArrayList<String>(); + List<String> stages = new ArrayList<String>(); + int seeds, depth, threads; + boolean delete; + long topN; + long elapsed; + String plugins; + + public void addTiming(String stage, String run, long timing) { + if (!runs.contains(run)) { + runs.add(run); + } + if (!stages.contains(stage)) { + stages.add(stage); + } + Map<String, Long> t = timings.get(stage); + if (t == null) { + t = new HashMap<String, Long>(); + timings.put(stage, t); + } + t.put(run, timing); + } + + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("* Plugins:\t" + plugins + "\n"); + sb.append("* Seeds:\t" + seeds + "\n"); + sb.append("* Depth:\t" + depth + "\n"); + sb.append("* Threads:\t" + threads + "\n"); + sb.append("* TopN:\t" + topN + "\n"); + sb.append("* Delete:\t" + delete + "\n"); + sb.append("* TOTAL ELAPSED:\t" + elapsed + "\n"); + for (String stage : stages) { + Map<String, Long> timing = timings.get(stage); + if (timing == null) + continue; + sb.append("- stage: " + stage + "\n"); + for (String r : runs) { + Long Time = timing.get(r); + if (Time == null) { + continue; + } + sb.append("\trun " + r + "\t" + Time + "\n"); + } + } + return sb.toString(); + } + + public List<String> getStages() { + return stages; + } + + public List<String> getRuns() { + return runs; + } + } + + public int run(String[] args) throws Exception { + String plugins = "protocol-http|parse-tika|scoring-opic|urlfilter-regex|urlnormalizer-pass"; + int seeds = 1; + int depth = 10; + int threads = 10; + boolean delete = true; + long topN = Long.MAX_VALUE; + + if (args.length == 0) { + System.err + .println("Usage: Benchmark [-seeds NN] [-depth NN] [-threads NN] [-keep] [-maxPerHost NN] [-plugins <regex>]"); + System.err + .println("\t-seeds NN\tcreate NN unique hosts in a seed list (default: 1)"); + System.err.println("\t-depth NN\tperform NN crawl cycles (default: 10)"); + System.err + .println("\t-threads NN\tuse NN threads per Fetcher task (default: 10)"); + System.err + .println("\t-keep\tkeep segment data (default: delete after updatedb)"); + System.err.println("\t-plugins <regex>\toverride 'plugin.includes'."); + System.err.println("\tNOTE: if not specified, this is reset to: " + + plugins); + System.err + .println("\tNOTE: if 'default' is specified then a value set in nutch-default/nutch-site is used."); + System.err + .println("\t-maxPerHost NN\tmax. # of URLs per host in a fetchlist"); + return -1; + } + int maxPerHost = Integer.MAX_VALUE; + for (int i = 0; i < args.length; i++) { + if (args[i].equals("-seeds")) { + seeds = Integer.parseInt(args[++i]); + } else if (args[i].equals("-threads")) { + threads = Integer.parseInt(args[++i]); + } else if (args[i].equals("-depth")) { + depth = Integer.parseInt(args[++i]); + } else if (args[i].equals("-keep")) { + delete = false; + } else if (args[i].equals("-plugins")) { + plugins = args[++i]; + } else if (args[i].equalsIgnoreCase("-maxPerHost")) { + maxPerHost = Integer.parseInt(args[++i]); + } else { + LOG.fatal("Invalid argument: '" + args[i] + "'"); + return -1; + } + } + BenchmarkResults res = benchmark(seeds, depth, threads, maxPerHost, topN, + delete, plugins); + System.out.println(res); + return 0; + } + + public BenchmarkResults benchmark(int seeds, int depth, int threads, + int maxPerHost, long topN, boolean delete, String plugins) + throws Exception { + Configuration conf = getConf(); + conf.set("http.proxy.host", "localhost"); + conf.setInt("http.proxy.port", 8181); + conf.set("http.agent.name", "test"); + conf.set("http.robots.agents", "test,*"); + if (!plugins.equals("default")) { + conf.set("plugin.includes", plugins); + } + conf.setInt(Generator.GENERATOR_MAX_COUNT, maxPerHost); + conf.set(Generator.GENERATOR_COUNT_MODE, + Generator.GENERATOR_COUNT_VALUE_HOST); + JobConf job = new NutchJob(getConf()); + FileSystem fs = FileSystem.get(job); + Path dir = new Path(getConf().get("hadoop.tmp.dir"), "bench-" + + System.currentTimeMillis()); + fs.mkdirs(dir); + Path rootUrlDir = new Path(dir, "seed"); + fs.mkdirs(rootUrlDir); + createSeeds(fs, rootUrlDir, seeds); + + if (LOG.isInfoEnabled()) { + LOG.info("crawl started in: " + dir); + LOG.info("rootUrlDir = " + rootUrlDir); + LOG.info("threads = " + threads); + LOG.info("depth = " + depth); + } + BenchmarkResults res = new BenchmarkResults(); + res.delete = delete; + res.depth = depth; + res.plugins = plugins; + res.seeds = seeds; + res.threads = threads; + res.topN = topN; + Path crawlDb = new Path(dir + "/crawldb"); + Path linkDb = new Path(dir + "/linkdb"); + Path segments = new Path(dir + "/segments"); + res.elapsed = System.currentTimeMillis(); + Injector injector = new Injector(getConf()); + Generator generator = new Generator(getConf()); + Fetcher fetcher = new Fetcher(getConf()); + ParseSegment parseSegment = new ParseSegment(getConf()); + CrawlDb crawlDbTool = new CrawlDb(getConf()); + LinkDb linkDbTool = new LinkDb(getConf()); + + // initialize crawlDb + long start = System.currentTimeMillis(); + injector.inject(crawlDb, rootUrlDir); + long delta = System.currentTimeMillis() - start; + res.addTiming("inject", "0", delta); + int i; + for (i = 0; i < depth; i++) { // generate new segment + start = System.currentTimeMillis(); + Path[] segs = generator.generate(crawlDb, segments, -1, topN, + System.currentTimeMillis()); + delta = System.currentTimeMillis() - start; + res.addTiming("generate", i + "", delta); + if (segs == null) { + LOG.info("Stopping at depth=" + i + " - no more URLs to fetch."); + break; + } + start = System.currentTimeMillis(); + fetcher.fetch(segs[0], threads); // fetch it + delta = System.currentTimeMillis() - start; + res.addTiming("fetch", i + "", delta); + if (!Fetcher.isParsing(job)) { + start = System.currentTimeMillis(); + parseSegment.parse(segs[0]); // parse it, if needed + delta = System.currentTimeMillis() - start; + res.addTiming("parse", i + "", delta); + } + start = System.currentTimeMillis(); + crawlDbTool.update(crawlDb, segs, true, true); // update crawldb + delta = System.currentTimeMillis() - start; + res.addTiming("update", i + "", delta); + start = System.currentTimeMillis(); + linkDbTool.invert(linkDb, segs, true, true, false); // invert links + delta = System.currentTimeMillis() - start; + res.addTiming("invert", i + "", delta); + // delete data + if (delete) { + for (Path p : segs) { + fs.delete(p, true); + } + } + } + if (i == 0) { + LOG.warn("No URLs to fetch - check your seed list and URL filters."); + } + if (LOG.isInfoEnabled()) { + LOG.info("crawl finished: " + dir); + } + res.elapsed = System.currentTimeMillis() - res.elapsed; + CrawlDbReader dbreader = new CrawlDbReader(); + dbreader.processStatJob(crawlDb.toString(), job, false); + return res; + } + +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlConfig.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlConfig.java b/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlConfig.java new file mode 100644 index 0000000..d8c06c0 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlConfig.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.tools; + +import java.io.IOException; +import java.io.InputStream; +import java.io.Serializable; +import java.util.Properties; + +public class CommonCrawlConfig implements Serializable { + + /** + * Serial version UID + */ + private static final long serialVersionUID = 5235013733207799661L; + + // Prefix for key value in the output format + private String keyPrefix = ""; + + private boolean simpleDateFormat = false; + + private boolean jsonArray = false; + + private boolean reverseKey = false; + + private String reverseKeyValue = ""; + + private boolean compressed = false; + + private long warcSize = 0; + + private String outputDir; + + /** + * Default constructor + */ + public CommonCrawlConfig() { + // TODO init(this.getClass().getResourceAsStream("CommonCrawlConfig.properties")); + } + + public CommonCrawlConfig(InputStream stream) { + init(stream); + } + + private void init(InputStream stream) { + if (stream == null) { + return; + } + Properties properties = new Properties(); + + try { + properties.load(stream); + } catch (IOException e) { + // TODO + } finally { + try { + stream.close(); + } catch (IOException e) { + // TODO + } + } + + setKeyPrefix(properties.getProperty("keyPrefix", "")); + setSimpleDateFormat(Boolean.parseBoolean(properties.getProperty("simpleDateFormat", "False"))); + setJsonArray(Boolean.parseBoolean(properties.getProperty("jsonArray", "False"))); + setReverseKey(Boolean.parseBoolean(properties.getProperty("reverseKey", "False"))); + } + + public void setKeyPrefix(String keyPrefix) { + this.keyPrefix = keyPrefix; + } + + public void setSimpleDateFormat(boolean simpleDateFormat) { + this.simpleDateFormat = simpleDateFormat; + } + + public void setJsonArray(boolean jsonArray) { + this.jsonArray = jsonArray; + } + + public void setReverseKey(boolean reverseKey) { + this.reverseKey = reverseKey; + } + + public void setReverseKeyValue(String reverseKeyValue) { + this.reverseKeyValue = reverseKeyValue; + } + + public String getKeyPrefix() { + return this.keyPrefix; + } + + public boolean getSimpleDateFormat() { + return this.simpleDateFormat; + } + + public boolean getJsonArray() { + return this.jsonArray; + } + + public boolean getReverseKey() { + return this.reverseKey; + } + + public String getReverseKeyValue() { + return this.reverseKeyValue; + } + + public boolean isCompressed() { + return compressed; + } + + public void setCompressed(boolean compressed) { + this.compressed = compressed; + } + + public long getWarcSize() { + return warcSize; + } + + public void setWarcSize(long warcSize) { + this.warcSize = warcSize; + } + + public String getOutputDir() { + return outputDir; + } + + public void setOutputDir(String outputDir) { + this.outputDir = outputDir; + } +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlDataDumper.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlDataDumper.java b/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlDataDumper.java new file mode 100644 index 0000000..b4fc0a7 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlDataDumper.java @@ -0,0 +1,716 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.tools; + +//JDK imports + +import java.io.BufferedOutputStream; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.text.ParseException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; +import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream; +//Commons imports +import org.apache.commons.io.IOUtils; +import org.apache.commons.io.FilenameUtils; + +//Hadoop +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.nutch.crawl.Inlink; +import org.apache.nutch.crawl.Inlinks; +import org.apache.nutch.crawl.LinkDbReader; +import org.apache.nutch.metadata.Metadata; +import org.apache.nutch.protocol.Content; +import org.apache.nutch.util.DumpFileUtil; +import org.apache.nutch.util.NutchConfiguration; +//Tika imports +import org.apache.tika.Tika; + +import com.fasterxml.jackson.dataformat.cbor.CBORFactory; +import com.fasterxml.jackson.dataformat.cbor.CBORGenerator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.ibm.icu.text.DateFormat; +import com.ibm.icu.text.SimpleDateFormat; + +/** + * <p> + * The Common Crawl Data Dumper tool enables one to reverse generate the raw + * content from Nutch segment data directories into a common crawling data + * format, consumed by many applications. The data is then serialized as <a + * href="http://cbor.io">CBOR</a> + * </p> + * <p> + * Text content will be stored in a structured document format. Below is a + * schema for storage of data and metadata related to a crawling request, with + * the response body truncated for readability. This document must be encoded + * using CBOR and should be compressed with gzip after encoding. The timestamped + * URL key for these records' keys follows the same layout as the media file + * directory structure, with underscores in place of directory separators. </li> + * </p> + * <p> + * Thus, the timestamped url key for the record is provided below followed by an + * example record: + * <p/> + * <pre> + * {@code + * com_somepage_33a3e36bbef59c2a5242c2ccee59239ab30d51f3_1411623696000 + * + * { + * "url": "http:\/\/somepage.com\/22\/14560817", + * "timestamp": "1411623696000", + * "request": { + * "method": "GET", + * "client": { + * "hostname": "crawler01.local", + * "address": "74.347.129.200", + * "software": "Apache Nutch v1.10", + * "robots": "classic", + * "contact": { + * "name": "Nutch Admin", + * "email": "[email protected]" + * } + * }, + * "headers": { + * "Accept": "text\/html,application\/xhtml+xml,application\/xml", + * "Accept-Encoding": "gzip,deflate,sdch", + * "Accept-Language": "en-US,en", + * "User-Agent": "Mozilla\/5.0", + * "...": "..." + * }, + * "body": null + * }, + * "response": { + * "status": "200", + * "server": { + * "hostname": "somepage.com", + * "address": "55.33.51.19", + * }, + * "headers": { + * "Content-Encoding": "gzip", + * "Content-Type": "text\/html", + * "Date": "Thu, 25 Sep 2014 04:16:58 GMT", + * "Expires": "Thu, 25 Sep 2014 04:16:57 GMT", + * "Server": "nginx", + * "...": "..." + * }, + * "body": "\r\n <!DOCTYPE html PUBLIC ... \r\n\r\n \r\n </body>\r\n </html>\r\n \r\n\r\n", + * }, + * "key": "com_somepage_33a3e36bbef59c2a5242c2ccee59239ab30d51f3_1411623696000", + * "imported": "1411623698000" + * } + * } + * </pre> + * <p/> + * <p> + * Upon successful completion the tool displays a very convenient JSON snippet + * detailing the mimetype classifications and the counts of documents which fall + * into those classifications. An example is as follows: + * </p> + * <p/> + * <pre> + * {@code + * INFO: File Types: + * TOTAL Stats: { + * {"mimeType":"application/xml","count":19"} + * {"mimeType":"image/png","count":47"} + * {"mimeType":"image/jpeg","count":141"} + * {"mimeType":"image/vnd.microsoft.icon","count":4"} + * {"mimeType":"text/plain","count":89"} + * {"mimeType":"video/quicktime","count":2"} + * {"mimeType":"image/gif","count":63"} + * {"mimeType":"application/xhtml+xml","count":1670"} + * {"mimeType":"application/octet-stream","count":40"} + * {"mimeType":"text/html","count":1863"} + * } + * } + * </pre> + */ +public class CommonCrawlDataDumper extends Configured implements Tool { + + private static final Logger LOG = LoggerFactory + .getLogger(CommonCrawlDataDumper.class.getName()); + private static final int MAX_INLINKS = 5000; + + private CommonCrawlConfig config = null; + + // Gzip initialization + private FileOutputStream fileOutput = null; + private BufferedOutputStream bufOutput = null; + private GzipCompressorOutputStream gzipOutput = null; + private TarArchiveOutputStream tarOutput = null; + private ArrayList<String> fileList = null; + + /** + * Main method for invoking this tool + * + * @param args 1) output directory (which will be created if it does not + * already exist) to host the CBOR data and 2) a directory + * containing one or more segments from which we wish to generate + * CBOR data from. Optionally, 3) a list of mimetypes and the 4) + * the gzip option may be provided. + * @throws Exception + */ + public static void main(String[] args) throws Exception { + Configuration conf = NutchConfiguration.create(); + int res = ToolRunner.run(conf, new CommonCrawlDataDumper(), args); + System.exit(res); + } + + /** + * Constructor + */ + public CommonCrawlDataDumper(CommonCrawlConfig config) { + this.config = config; + } + + public CommonCrawlDataDumper() { + } + + /** + * Dumps the reverse engineered CBOR content from the provided segment + * directories if a parent directory contains more than one segment, + * otherwise a single segment can be passed as an argument. If the boolean + * argument is provided then the CBOR is also zipped. + * + * @param outputDir the directory you wish to dump the raw content to. This + * directory will be created. + * @param segmentRootDir a directory containing one or more segments. + * @param linkdb Path to linkdb. + * @param gzip a boolean flag indicating whether the CBOR content should also + * be gzipped. + * @param epochFilename if {@code true}, output files will be names using the epoch time (in milliseconds). + * @param extension a file extension to use with output documents. + * @throws Exception if any exception occurs. + */ + public void dump(File outputDir, File segmentRootDir, File linkdb, boolean gzip, + String[] mimeTypes, boolean epochFilename, String extension, boolean warc) + throws Exception { + if (gzip) { + LOG.info("Gzipping CBOR data has been skipped"); + } + // total file counts + Map<String, Integer> typeCounts = new HashMap<String, Integer>(); + // filtered file counters + Map<String, Integer> filteredCounts = new HashMap<String, Integer>(); + + Configuration nutchConfig = NutchConfiguration.create(); + final FileSystem fs = FileSystem.get(nutchConfig); + Path segmentRootPath = new Path(segmentRootDir.toString()); + + //get all paths + List<Path> parts = new ArrayList<>(); + RemoteIterator<LocatedFileStatus> files = fs.listFiles(segmentRootPath, true); + String partPattern = ".*" + File.separator + Content.DIR_NAME + + File.separator + "part-[0-9]{5}" + File.separator + "data"; + while (files.hasNext()) { + LocatedFileStatus next = files.next(); + if (next.isFile()) { + Path path = next.getPath(); + if (path.toString().matches(partPattern)){ + parts.add(path); + } + } + } + + LinkDbReader linkDbReader = null; + if (linkdb != null) { + linkDbReader = new LinkDbReader(fs.getConf(), new Path(linkdb.toString())); + } + if (parts == null || parts.size() == 0) { + LOG.error( "No segment directories found in {} ", + segmentRootDir.getAbsolutePath()); + System.exit(1); + } + LOG.info("Found {} segment parts", parts.size()); + if (gzip && !warc) { + fileList = new ArrayList<>(); + constructNewStream(outputDir); + } + + for (Path segmentPart : parts) { + LOG.info("Processing segment Part : [ {} ]", segmentPart); + try { + SequenceFile.Reader reader = new SequenceFile.Reader(nutchConfig, + SequenceFile.Reader.file(segmentPart)); + + Writable key = (Writable) reader.getKeyClass().newInstance(); + + Content content = null; + while (reader.next(key)) { + content = new Content(); + reader.getCurrentValue(content); + Metadata metadata = content.getMetadata(); + String url = key.toString(); + + String baseName = FilenameUtils.getBaseName(url); + String extensionName = FilenameUtils.getExtension(url); + + if (!extension.isEmpty()) { + extensionName = extension; + } else if ((extensionName == null) || extensionName.isEmpty()) { + extensionName = "html"; + } + + String outputFullPath = null; + String outputRelativePath = null; + String filename = null; + String timestamp = null; + String reverseKey = null; + + if (epochFilename || config.getReverseKey()) { + try { + long epoch = new SimpleDateFormat("EEE, d MMM yyyy HH:mm:ss z") + .parse(getDate(metadata.get("Date"))).getTime(); + timestamp = String.valueOf(epoch); + } catch (ParseException pe) { + LOG.warn(pe.getMessage()); + } + + reverseKey = reverseUrl(url); + config.setReverseKeyValue( + reverseKey.replace("/", "_") + "_" + DigestUtils.sha1Hex(url) + + "_" + timestamp); + } + + if (!warc) { + if (epochFilename) { + outputFullPath = DumpFileUtil + .createFileNameFromUrl(outputDir.getAbsolutePath(), + reverseKey, url, timestamp, extensionName, !gzip); + outputRelativePath = outputFullPath + .substring(0, outputFullPath.lastIndexOf(File.separator) - 1); + filename = content.getMetadata().get(Metadata.DATE) + "." + + extensionName; + } else { + String md5Ofurl = DumpFileUtil.getUrlMD5(url); + String fullDir = DumpFileUtil + .createTwoLevelsDirectory(outputDir.getAbsolutePath(), + md5Ofurl, !gzip); + filename = DumpFileUtil + .createFileName(md5Ofurl, baseName, extensionName); + outputFullPath = String.format("%s/%s", fullDir, filename); + + String[] fullPathLevels = fullDir.split(File.separator); + String firstLevelDirName = fullPathLevels[fullPathLevels.length + - 2]; + String secondLevelDirName = fullPathLevels[fullPathLevels.length + - 1]; + outputRelativePath = firstLevelDirName + secondLevelDirName; + } + } + // Encode all filetypes if no mimetypes have been given + Boolean filter = (mimeTypes == null); + + String jsonData = ""; + try { + String mimeType = new Tika().detect(content.getContent()); + // Maps file to JSON-based structure + + Set<String> inUrls = null; //there may be duplicates, so using set + if (linkDbReader != null) { + Inlinks inlinks = linkDbReader.getInlinks((Text) key); + if (inlinks != null) { + Iterator<Inlink> iterator = inlinks.iterator(); + inUrls = new LinkedHashSet<>(); + while (inUrls.size() <= MAX_INLINKS && iterator.hasNext()){ + inUrls.add(iterator.next().getFromUrl()); + } + } + } + //TODO: Make this Jackson Format implementation reusable + try (CommonCrawlFormat format = CommonCrawlFormatFactory + .getCommonCrawlFormat(warc ? "WARC" : "JACKSON", nutchConfig, config)) { + if (inUrls != null) { + format.setInLinks(new ArrayList<>(inUrls)); + } + jsonData = format.getJsonData(url, content, metadata); + } + + collectStats(typeCounts, mimeType); + // collects statistics for the given mimetypes + if ((mimeType != null) && (mimeTypes != null) && Arrays + .asList(mimeTypes).contains(mimeType)) { + collectStats(filteredCounts, mimeType); + filter = true; + } + } catch (IOException ioe) { + LOG.error("Fatal error in creating JSON data: " + ioe.getMessage()); + return; + } + + if (!warc) { + if (filter) { + byte[] byteData = serializeCBORData(jsonData); + + if (!gzip) { + File outputFile = new File(outputFullPath); + if (outputFile.exists()) { + LOG.info("Skipping writing: [" + outputFullPath + + "]: file already exists"); + } else { + LOG.info("Writing: [" + outputFullPath + "]"); + IOUtils.copy(new ByteArrayInputStream(byteData), + new FileOutputStream(outputFile)); + } + } else { + if (fileList.contains(outputFullPath)) { + LOG.info("Skipping compressing: [" + outputFullPath + + "]: file already exists"); + } else { + fileList.add(outputFullPath); + LOG.info("Compressing: [" + outputFullPath + "]"); + //TarArchiveEntry tarEntry = new TarArchiveEntry(firstLevelDirName + File.separator + secondLevelDirName + File.separator + filename); + TarArchiveEntry tarEntry = new TarArchiveEntry( + outputRelativePath + File.separator + filename); + tarEntry.setSize(byteData.length); + tarOutput.putArchiveEntry(tarEntry); + tarOutput.write(byteData); + tarOutput.closeArchiveEntry(); + } + } + } + } + } + reader.close(); + } catch (Exception e){ + LOG.warn("SKIPPED: {} Because : {}", segmentPart, e.getMessage()); + } finally { + fs.close(); + } + } + + if (gzip && !warc) { + closeStream(); + } + + if (!typeCounts.isEmpty()) { + LOG.info("CommonsCrawlDataDumper File Stats: " + DumpFileUtil + .displayFileTypes(typeCounts, filteredCounts)); + } + + } + + private void closeStream() { + try { + tarOutput.finish(); + + tarOutput.close(); + gzipOutput.close(); + bufOutput.close(); + fileOutput.close(); + } catch (IOException ioe) { + LOG.warn("Error in closing stream: " + ioe.getMessage()); + } + } + + private void constructNewStream(File outputDir) throws IOException { + String archiveName = new SimpleDateFormat("yyyyMMddhhmm'.tar.gz'") + .format(new Date()); + LOG.info("Creating a new gzip archive: " + archiveName); + fileOutput = new FileOutputStream( + new File(outputDir + File.separator + archiveName)); + bufOutput = new BufferedOutputStream(fileOutput); + gzipOutput = new GzipCompressorOutputStream(bufOutput); + tarOutput = new TarArchiveOutputStream(gzipOutput); + tarOutput.setLongFileMode(TarArchiveOutputStream.LONGFILE_GNU); + } + + /** + * Writes the CBOR "Self-Describe Tag" (value 55799, serialized as 3-byte + * sequence of {@code 0xd9d9f7}) at the current position. This method must + * be used to write the CBOR magic number at the beginning of the document. + * Since version 2.5, <a + * href="https://github.com/FasterXML/jackson-dataformat-cbor" + * >jackson-dataformat-cbor</a> will support the {@code WRITE_TYPE_HEADER} + * feature to write that type tag at the beginning of the document. + * + * @param generator {@link CBORGenerator} object used to create a CBOR-encoded document. + * @throws IOException if any I/O error occurs. + * @see <a href="https://tools.ietf.org/html/rfc7049#section-2.4.5">RFC + * 7049</a> + */ + private void writeMagicHeader(CBORGenerator generator) throws IOException { + // Writes self-describe CBOR + // https://tools.ietf.org/html/rfc7049#section-2.4.5 + // It will be supported in jackson-cbor since 2.5 + byte[] header = new byte[3]; + header[0] = (byte) 0xd9; + header[1] = (byte) 0xd9; + header[2] = (byte) 0xf7; + generator.writeBytes(header, 0, header.length); + } + + private byte[] serializeCBORData(String jsonData) { + CBORFactory factory = new CBORFactory(); + + CBORGenerator generator = null; + ByteArrayOutputStream stream = null; + + try { + stream = new ByteArrayOutputStream(); + generator = factory.createGenerator(stream); + // Writes CBOR tag + writeMagicHeader(generator); + generator.writeString(jsonData); + generator.flush(); + stream.flush(); + + return stream.toByteArray(); + + } catch (Exception e) { + LOG.warn("CBOR encoding failed: " + e.getMessage()); + } finally { + try { + generator.close(); + stream.close(); + } catch (IOException e) { + // nothing to do + } + } + + return null; + } + + private void collectStats(Map<String, Integer> typeCounts, String mimeType) { + typeCounts.put(mimeType, + typeCounts.containsKey(mimeType) ? typeCounts.get(mimeType) + 1 : 1); + } + + /** + * Gets the current date if the given timestamp is empty or null. + * + * @param timestamp the timestamp + * @return the current timestamp if the given one is null. + */ + private String getDate(String timestamp) { + if (timestamp == null || timestamp.isEmpty()) { + DateFormat dateFormat = new SimpleDateFormat( + "EEE, d MMM yyyy HH:mm:ss z"); + timestamp = dateFormat.format(new Date()); + } + return timestamp; + + } + + public static String reverseUrl(String urlString) { + URL url; + String reverseKey = null; + try { + url = new URL(urlString); + + String[] hostPart = url.getHost().replace('.', '/').split("/"); + + StringBuilder sb = new StringBuilder(); + sb.append(hostPart[hostPart.length - 1]); + for (int i = hostPart.length - 2; i >= 0; i--) { + sb.append("/" + hostPart[i]); + } + + reverseKey = sb.toString(); + + } catch (MalformedURLException e) { + LOG.error("Failed to parse URL: {}", urlString); + } + + return reverseKey; + } + + @Override + public int run(String[] args) throws Exception { + Option helpOpt = new Option("h", "help", false, "show this help message."); + // argument options + @SuppressWarnings("static-access") + Option outputOpt = OptionBuilder.withArgName("outputDir").hasArg() + .withDescription( + "output directory (which will be created) to host the CBOR data.") + .create("outputDir"); + // WARC format + Option warcOpt = new Option("warc", "export to a WARC file"); + + @SuppressWarnings("static-access") + Option segOpt = OptionBuilder.withArgName("segment").hasArgs() + .withDescription("the segment or directory containing segments to use").create("segment"); + // create mimetype and gzip options + @SuppressWarnings("static-access") + Option mimeOpt = OptionBuilder.isRequired(false).withArgName("mimetype") + .hasArgs().withDescription( + "an optional list of mimetypes to dump, excluding all others. Defaults to all.") + .create("mimetype"); + @SuppressWarnings("static-access") + Option gzipOpt = OptionBuilder.withArgName("gzip").hasArg(false) + .withDescription( + "an optional flag indicating whether to additionally gzip the data.") + .create("gzip"); + @SuppressWarnings("static-access") + Option keyPrefixOpt = OptionBuilder.withArgName("keyPrefix").hasArg(true) + .withDescription("an optional prefix for key in the output format.") + .create("keyPrefix"); + @SuppressWarnings("static-access") + Option simpleDateFormatOpt = OptionBuilder.withArgName("SimpleDateFormat") + .hasArg(false).withDescription( + "an optional format for timestamp in GMT epoch milliseconds.") + .create("SimpleDateFormat"); + @SuppressWarnings("static-access") + Option epochFilenameOpt = OptionBuilder.withArgName("epochFilename") + .hasArg(false) + .withDescription("an optional format for output filename.") + .create("epochFilename"); + @SuppressWarnings("static-access") + Option jsonArrayOpt = OptionBuilder.withArgName("jsonArray").hasArg(false) + .withDescription("an optional format for JSON output.") + .create("jsonArray"); + @SuppressWarnings("static-access") + Option reverseKeyOpt = OptionBuilder.withArgName("reverseKey").hasArg(false) + .withDescription("an optional format for key value in JSON output.") + .create("reverseKey"); + @SuppressWarnings("static-access") + Option extensionOpt = OptionBuilder.withArgName("extension").hasArg(true) + .withDescription("an optional file extension for output documents.") + .create("extension"); + @SuppressWarnings("static-access") + Option sizeOpt = OptionBuilder.withArgName("warcSize").hasArg(true) + .withType(Number.class) + .withDescription("an optional file size in bytes for the WARC file(s)") + .create("warcSize"); + @SuppressWarnings("static-access") + Option linkDbOpt = OptionBuilder.withArgName("linkdb").hasArg(true) + .withDescription("an optional linkdb parameter to include inlinks in dump files") + .isRequired(false) + .create("linkdb"); + + // create the options + Options options = new Options(); + options.addOption(helpOpt); + options.addOption(outputOpt); + options.addOption(segOpt); + // create mimetypes and gzip options + options.addOption(warcOpt); + options.addOption(mimeOpt); + options.addOption(gzipOpt); + // create keyPrefix option + options.addOption(keyPrefixOpt); + // create simpleDataFormat option + options.addOption(simpleDateFormatOpt); + options.addOption(epochFilenameOpt); + options.addOption(jsonArrayOpt); + options.addOption(reverseKeyOpt); + options.addOption(extensionOpt); + options.addOption(sizeOpt); + options.addOption(linkDbOpt); + + CommandLineParser parser = new GnuParser(); + try { + CommandLine line = parser.parse(options, args); + if (line.hasOption("help") || !line.hasOption("outputDir") || (!line + .hasOption("segment"))) { + HelpFormatter formatter = new HelpFormatter(); + formatter + .printHelp(CommonCrawlDataDumper.class.getName(), options, true); + return 0; + } + + File outputDir = new File(line.getOptionValue("outputDir")); + File segmentRootDir = new File(line.getOptionValue("segment")); + String[] mimeTypes = line.getOptionValues("mimetype"); + boolean gzip = line.hasOption("gzip"); + boolean epochFilename = line.hasOption("epochFilename"); + + String keyPrefix = line.getOptionValue("keyPrefix", ""); + boolean simpleDateFormat = line.hasOption("SimpleDateFormat"); + boolean jsonArray = line.hasOption("jsonArray"); + boolean reverseKey = line.hasOption("reverseKey"); + String extension = line.getOptionValue("extension", ""); + boolean warc = line.hasOption("warc"); + long warcSize = 0; + + if (line.getParsedOptionValue("warcSize") != null) { + warcSize = (Long) line.getParsedOptionValue("warcSize"); + } + String linkdbPath = line.getOptionValue("linkdb"); + File linkdb = linkdbPath == null ? null : new File(linkdbPath); + + CommonCrawlConfig config = new CommonCrawlConfig(); + config.setKeyPrefix(keyPrefix); + config.setSimpleDateFormat(simpleDateFormat); + config.setJsonArray(jsonArray); + config.setReverseKey(reverseKey); + config.setCompressed(gzip); + config.setWarcSize(warcSize); + config.setOutputDir(line.getOptionValue("outputDir")); + + if (!outputDir.exists()) { + LOG.warn("Output directory: [" + outputDir.getAbsolutePath() + + "]: does not exist, creating it."); + if (!outputDir.mkdirs()) + throw new Exception( + "Unable to create: [" + outputDir.getAbsolutePath() + "]"); + } + + CommonCrawlDataDumper dumper = new CommonCrawlDataDumper(config); + + dumper.dump(outputDir, segmentRootDir, linkdb, gzip, mimeTypes, epochFilename, + extension, warc); + + } catch (Exception e) { + LOG.error(CommonCrawlDataDumper.class.getName() + ": " + StringUtils + .stringifyException(e)); + e.printStackTrace(); + return -1; + } + + return 0; + } +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormat.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormat.java b/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormat.java new file mode 100644 index 0000000..0834d95 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormat.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.tools; + +import org.apache.nutch.metadata.Metadata; +import org.apache.nutch.parse.ParseData; +import org.apache.nutch.protocol.Content; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; + +/** + * Interface for all CommonCrawl formatter. It provides the signature for the + * method used to get JSON data. + * + * @author gtotaro + * + */ +public interface CommonCrawlFormat extends Closeable { + + /** + * + * @param mapAll If {@code true} maps all metdata on the JSON structure. + * @return the JSON data + */ + //public String getJsonData(boolean mapAll) throws IOException; + public String getJsonData() throws IOException; + + /** + * Returns a string representation of the JSON structure of the URL content + * + * @param url + * @param content + * @param metadata + * @return + */ + public String getJsonData(String url, Content content, Metadata metadata) + throws IOException; + + /** + * Returns a string representation of the JSON structure of the URL content + * takes into account the parsed metadata about the URL + * + * @param url + * @param content + * @param metadata + * @return + */ + public String getJsonData(String url, Content content, Metadata metadata, + ParseData parseData) throws IOException; + + + /** + * sets inlinks of this document + * @param inLinks list of inlinks + */ + void setInLinks(List<String> inLinks); + + + /** + * gets set of inlinks + * @return gets inlinks of this document + */ + List<String> getInLinks(); + + /** + * Optional method that could be implemented if the actual format needs some + * close procedure. + */ + public abstract void close(); +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatFactory.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatFactory.java b/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatFactory.java new file mode 100644 index 0000000..8814168 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatFactory.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.tools; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.nutch.metadata.Metadata; +import org.apache.nutch.protocol.Content; + +/** + * Factory class that creates new {@see CommonCrawlFormat} objects (a.k.a. formatter) that map crawled files to CommonCrawl format. + * + */ +public class CommonCrawlFormatFactory { + + /** + * Returns a new instance of a {@see CommonCrawlFormat} object specifying the type of formatter. + * @param formatType the type of formatter to be created. + * @param url the url. + * @param content the content. + * @param metadata the metadata. + * @param nutchConf the configuration. + * @param config the CommonCrawl output configuration. + * @return the new {@see CommonCrawlFormat} object. + * @throws IOException If any I/O error occurs. + * @deprecated + */ + public static CommonCrawlFormat getCommonCrawlFormat(String formatType, String url, Content content, Metadata metadata, Configuration nutchConf, CommonCrawlConfig config) throws IOException { + if (formatType == null) { + return null; + } + + if (formatType.equalsIgnoreCase("jackson")) { + return new CommonCrawlFormatJackson(url, content, metadata, nutchConf, config); + } + else if (formatType.equalsIgnoreCase("jettinson")) { + return new CommonCrawlFormatJettinson(url, content, metadata, nutchConf, config); + } + else if (formatType.equalsIgnoreCase("simple")) { + return new CommonCrawlFormatSimple(url, content, metadata, nutchConf, config); + } + + return null; + } + + // The format should not depend on variable attributes, essentially this + // should be one for the full job + public static CommonCrawlFormat getCommonCrawlFormat(String formatType, Configuration nutchConf, CommonCrawlConfig config) throws IOException { + if (formatType.equalsIgnoreCase("WARC")) { + return new CommonCrawlFormatWARC(nutchConf, config); + } + + if (formatType.equalsIgnoreCase("JACKSON")) { + return new CommonCrawlFormatJackson( nutchConf, config); + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatJackson.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatJackson.java b/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatJackson.java new file mode 100644 index 0000000..0d6cae2 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatJackson.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.tools; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.nutch.metadata.Metadata; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import org.apache.nutch.protocol.Content; + +/** + * This class provides methods to map crawled data on JSON using Jackson Streaming APIs. + * + */ +public class CommonCrawlFormatJackson extends AbstractCommonCrawlFormat { + + private ByteArrayOutputStream out; + + private JsonGenerator generator; + + public CommonCrawlFormatJackson(Configuration nutchConf, + CommonCrawlConfig config) throws IOException { + super(null, null, null, nutchConf, config); + + JsonFactory factory = new JsonFactory(); + this.out = new ByteArrayOutputStream(); + this.generator = factory.createGenerator(out); + + this.generator.useDefaultPrettyPrinter(); // INDENTED OUTPUT + } + + public CommonCrawlFormatJackson(String url, Content content, Metadata metadata, Configuration nutchConf, CommonCrawlConfig config) throws IOException { + super(url, content, metadata, nutchConf, config); + + JsonFactory factory = new JsonFactory(); + this.out = new ByteArrayOutputStream(); + this.generator = factory.createGenerator(out); + + this.generator.useDefaultPrettyPrinter(); // INDENTED OUTPUT + } + + @Override + protected void writeKeyValue(String key, String value) throws IOException { + generator.writeFieldName(key); + generator.writeString(value); + } + + @Override + protected void writeKeyNull(String key) throws IOException { + generator.writeFieldName(key); + generator.writeNull(); + } + + @Override + protected void startArray(String key, boolean nested, boolean newline) throws IOException { + if (key != null) { + generator.writeFieldName(key); + } + generator.writeStartArray(); + } + + @Override + protected void closeArray(String key, boolean nested, boolean newline) throws IOException { + generator.writeEndArray(); + } + + @Override + protected void writeArrayValue(String value) throws IOException { + generator.writeString(value); + } + + @Override + protected void startObject(String key) throws IOException { + if (key != null) { + generator.writeFieldName(key); + } + generator.writeStartObject(); + } + + @Override + protected void closeObject(String key) throws IOException { + generator.writeEndObject(); + } + + @Override + protected String generateJson() throws IOException { + this.generator.flush(); + return this.out.toString(); + } +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatJettinson.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatJettinson.java b/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatJettinson.java new file mode 100644 index 0000000..6950e2a --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatJettinson.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.tools; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Deque; + +import org.apache.hadoop.conf.Configuration; +import org.apache.nutch.metadata.Metadata; +import org.apache.nutch.protocol.Content; +import org.codehaus.jettison.json.JSONArray; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; + +/** + * This class provides methods to map crawled data on JSON using Jettinson APIs. + * + */ +public class CommonCrawlFormatJettinson extends AbstractCommonCrawlFormat { + + private Deque<JSONObject> stackObjects; + + private Deque<JSONArray> stackArrays; + + public CommonCrawlFormatJettinson(String url, Content content, Metadata metadata, Configuration nutchConf, CommonCrawlConfig config) throws IOException { + super(url, content, metadata, nutchConf, config); + + stackObjects = new ArrayDeque<JSONObject>(); + stackArrays = new ArrayDeque<JSONArray>(); + } + + @Override + protected void writeKeyValue(String key, String value) throws IOException { + try { + stackObjects.getFirst().put(key, value); + } catch (JSONException jsone) { + throw new IOException(jsone.getMessage()); + } + } + + @Override + protected void writeKeyNull(String key) throws IOException { + try { + stackObjects.getFirst().put(key, JSONObject.NULL); + } catch (JSONException jsone) { + throw new IOException(jsone.getMessage()); + } + } + + @Override + protected void startArray(String key, boolean nested, boolean newline) throws IOException { + JSONArray array = new JSONArray(); + stackArrays.push(array); + } + + @Override + protected void closeArray(String key, boolean nested, boolean newline) throws IOException { + try { + if (stackArrays.size() > 1) { + JSONArray array = stackArrays.pop(); + if (nested) { + stackArrays.getFirst().put(array); + } + else { + stackObjects.getFirst().put(key, array); + } + } + } catch (JSONException jsone) { + throw new IOException(jsone.getMessage()); + } + } + + @Override + protected void writeArrayValue(String value) throws IOException { + if (stackArrays.size() > 1) { + stackArrays.getFirst().put(value); + } + } + + @Override + protected void startObject(String key) throws IOException { + JSONObject object = new JSONObject(); + stackObjects.push(object); + } + + @Override + protected void closeObject(String key) throws IOException { + try { + if (stackObjects.size() > 1) { + JSONObject object = stackObjects.pop(); + stackObjects.getFirst().put(key, object); + } + } catch (JSONException jsone) { + throw new IOException(jsone.getMessage()); + } + } + + @Override + protected String generateJson() throws IOException { + try { + return stackObjects.getFirst().toString(2); + } catch (JSONException jsone) { + throw new IOException(jsone.getMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatSimple.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatSimple.java b/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatSimple.java new file mode 100644 index 0000000..a1aaa44 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatSimple.java @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.tools; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.nutch.metadata.Metadata; +import org.apache.nutch.protocol.Content; + +/** + * This class provides methods to map crawled data on JSON using a {@see StringBuilder} object. + * + */ +public class CommonCrawlFormatSimple extends AbstractCommonCrawlFormat { + + private StringBuilder sb; + + private int tabCount; + + public CommonCrawlFormatSimple(String url, Content content, Metadata metadata, Configuration nutchConf, CommonCrawlConfig config) throws IOException { + super(url, content, metadata, nutchConf, config); + + this.sb = new StringBuilder(); + this.tabCount = 0; + } + + @Override + protected void writeKeyValue(String key, String value) throws IOException { + sb.append(printTabs() + "\"" + key + "\": " + quote(value) + ",\n"); + } + + @Override + protected void writeKeyNull(String key) throws IOException { + sb.append(printTabs() + "\"" + key + "\": null,\n"); + } + + @Override + protected void startArray(String key, boolean nested, boolean newline) throws IOException { + String name = (key != null) ? "\"" + key + "\": " : ""; + String nl = (newline) ? "\n" : ""; + sb.append(printTabs() + name + "[" + nl); + if (newline) { + this.tabCount++; + } + } + + @Override + protected void closeArray(String key, boolean nested, boolean newline) throws IOException { + if (sb.charAt(sb.length()-1) == ',') { + sb.deleteCharAt(sb.length()-1); // delete comma + } + else if (sb.charAt(sb.length()-2) == ',') { + sb.deleteCharAt(sb.length()-2); // delete comma + } + String nl = (newline) ? printTabs() : ""; + if (newline) { + this.tabCount++; + } + sb.append(nl + "],\n"); + } + + @Override + protected void writeArrayValue(String value) { + sb.append("\"" + value + "\","); + } + + protected void startObject(String key) throws IOException { + String name = ""; + if (key != null) { + name = "\"" + key + "\": "; + } + sb.append(printTabs() + name + "{\n"); + this.tabCount++; + } + + protected void closeObject(String key) throws IOException { + if (sb.charAt(sb.length()-2) == ',') { + sb.deleteCharAt(sb.length()-2); // delete comma + } + this.tabCount--; + sb.append(printTabs() + "},\n"); + } + + protected String generateJson() throws IOException { + sb.deleteCharAt(sb.length()-1); // delete new line + sb.deleteCharAt(sb.length()-1); // delete comma + return sb.toString(); + } + + private String printTabs() { + StringBuilder sb = new StringBuilder(); + for (int i=0; i < this.tabCount ;i++) { + sb.append("\t"); + } + return sb.toString(); + } + + private static String quote(String string) throws IOException { + StringBuilder sb = new StringBuilder(); + + if (string == null || string.length() == 0) { + sb.append("\"\""); + return sb.toString(); + } + + char b; + char c = 0; + String hhhh; + int i; + int len = string.length(); + + sb.append('"'); + for (i = 0; i < len; i += 1) { + b = c; + c = string.charAt(i); + switch (c) { + case '\\': + case '"': + sb.append('\\'); + sb.append(c); + break; + case '/': + if (b == '<') { + sb.append('\\'); + } + sb.append(c); + break; + case '\b': + sb.append("\\b"); + break; + case '\t': + sb.append("\\t"); + break; + case '\n': + sb.append("\\n"); + break; + case '\f': + sb.append("\\f"); + break; + case '\r': + sb.append("\\r"); + break; + default: + if (c < ' ' || (c >= '\u0080' && c < '\u00a0') + || (c >= '\u2000' && c < '\u2100')) { + sb.append("\\u"); + hhhh = Integer.toHexString(c); + sb.append("0000", 0, 4 - hhhh.length()); + sb.append(hhhh); + } else { + sb.append(c); + } + } + } + sb.append('"'); + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatWARC.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatWARC.java b/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatWARC.java new file mode 100644 index 0000000..191e42e --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatWARC.java @@ -0,0 +1,286 @@ +package org.apache.nutch.tools; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.text.ParseException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Date; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import com.ibm.icu.text.SimpleDateFormat; +import org.apache.commons.lang.NotImplementedException; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.nutch.metadata.Metadata; +import org.apache.nutch.parse.ParseData; + +import org.apache.nutch.parse.ParseSegment; +import org.apache.nutch.protocol.Content; +import org.archive.format.warc.WARCConstants; +import org.archive.io.WriterPoolMember; +import org.archive.io.warc.WARCRecordInfo; +import org.archive.io.warc.WARCWriter; +import org.archive.io.warc.WARCWriterPoolSettingsData; +import org.archive.uid.UUIDGenerator; +import org.archive.util.DateUtils; +import org.archive.util.anvl.ANVLRecord; + +public class CommonCrawlFormatWARC extends AbstractCommonCrawlFormat { + + public static final String MAX_WARC_FILE_SIZE = "warc.file.size.max"; + public static final String TEMPLATE = "${prefix}-${timestamp17}-${serialno}"; + + private static final AtomicInteger SERIALNO = new AtomicInteger(); + private final static UUIDGenerator GENERATOR = new UUIDGenerator(); + + private String outputDir = null; + private ByteArrayOutputStream out; + private WARCWriter writer; + private ParseData parseData; + + public CommonCrawlFormatWARC(Configuration nutchConf, + CommonCrawlConfig config) throws IOException { + super(null, null, null, nutchConf, config); + + this.out = new ByteArrayOutputStream(); + + ANVLRecord info = WARCUtils.getWARCInfoContent(nutchConf); + List<String> md = Collections.singletonList(info.toString()); + + this.outputDir = config.getOutputDir(); + + if (null == outputDir) { + String message = "Missing output directory configuration: " + outputDir; + + throw new RuntimeException(message); + } + + File file = new File(outputDir); + + long maxSize = WARCConstants.DEFAULT_MAX_WARC_FILE_SIZE; + + if (config.getWarcSize() > 0) { + maxSize = config.getWarcSize(); + } + + WARCWriterPoolSettingsData settings = new WARCWriterPoolSettingsData( + WriterPoolMember.DEFAULT_PREFIX, TEMPLATE, maxSize, + config.isCompressed(), Arrays.asList(new File[] { file }), md, + new UUIDGenerator()); + + writer = new WARCWriter(SERIALNO, settings); + } + + public CommonCrawlFormatWARC(String url, Content content, Metadata metadata, + Configuration nutchConf, CommonCrawlConfig config, ParseData parseData) + throws IOException { + super(url, content, metadata, nutchConf, config); + + this.out = new ByteArrayOutputStream(); + this.parseData = parseData; + + ANVLRecord info = WARCUtils.getWARCInfoContent(conf); + List<String> md = Collections.singletonList(info.toString()); + + this.outputDir = config.getOutputDir(); + + if (null == outputDir) { + String message = "Missing output directory configuration: " + outputDir; + + throw new RuntimeException(message); + } + + File file = new File(outputDir); + + long maxSize = WARCConstants.DEFAULT_MAX_WARC_FILE_SIZE; + + if (config.getWarcSize() > 0) { + maxSize = config.getWarcSize(); + } + + WARCWriterPoolSettingsData settings = new WARCWriterPoolSettingsData( + WriterPoolMember.DEFAULT_PREFIX, TEMPLATE, maxSize, + config.isCompressed(), Arrays.asList(new File[] { file }), md, + new UUIDGenerator()); + + writer = new WARCWriter(SERIALNO, settings); + } + + public String getJsonData(String url, Content content, Metadata metadata, + ParseData parseData) throws IOException { + this.url = url; + this.content = content; + this.metadata = metadata; + this.parseData = parseData; + + return this.getJsonData(); + } + + @Override + public String getJsonData() throws IOException { + + long position = writer.getPosition(); + + try { + // See if we need to open a new file because we've exceeded maxBytes + + // checkSize will open a new file if we exceeded the maxBytes setting + writer.checkSize(); + + if (writer.getPosition() != position) { + // We just closed the file because it was larger than maxBytes. + position = writer.getPosition(); + } + + // response record + URI id = writeResponse(); + + if (StringUtils.isNotBlank(metadata.get("_request_"))) { + // write the request method if any request info is found + writeRequest(id); + } + } catch (IOException e) { + // Launch the corresponding IO error + throw e; + } catch (ParseException e) { + // do nothing, as we can't establish a valid WARC-Date for this record + // lets skip it altogether + LOG.error("Can't get a valid date from: {}", url); + } + + return null; + } + + protected URI writeResponse() throws IOException, ParseException { + WARCRecordInfo record = new WARCRecordInfo(); + + record.setType(WARCConstants.WARCRecordType.response); + record.setUrl(getUrl()); + + String fetchTime; + + record.setCreate14DigitDate(DateUtils + .getLog14Date(Long.parseLong(metadata.get("nutch.fetch.time")))); + record.setMimetype(WARCConstants.HTTP_RESPONSE_MIMETYPE); + record.setRecordId(GENERATOR.getRecordID()); + + String IP = getResponseAddress(); + + if (StringUtils.isNotBlank(IP)) + record.addExtraHeader(WARCConstants.HEADER_KEY_IP, IP); + + if (ParseSegment.isTruncated(content)) + record.addExtraHeader(WARCConstants.HEADER_KEY_TRUNCATED, "unspecified"); + + ByteArrayOutputStream output = new ByteArrayOutputStream(); + + String httpHeaders = metadata.get("_response.headers_"); + + if (StringUtils.isNotBlank(httpHeaders)) { + output.write(httpHeaders.getBytes()); + } else { + // change the record type to resource as we not have information about + // the headers + record.setType(WARCConstants.WARCRecordType.resource); + record.setMimetype(content.getContentType()); + } + + output.write(getResponseContent().getBytes()); + + record.setContentLength(output.size()); + record.setContentStream(new ByteArrayInputStream(output.toByteArray())); + + if (output.size() > 0) { + // avoid generating a 0 sized record, as the webarchive library will + // complain about it + writer.writeRecord(record); + } + + return record.getRecordId(); + } + + protected URI writeRequest(URI id) throws IOException, ParseException { + WARCRecordInfo record = new WARCRecordInfo(); + + record.setType(WARCConstants.WARCRecordType.request); + record.setUrl(getUrl()); + record.setCreate14DigitDate(DateUtils + .getLog14Date(Long.parseLong(metadata.get("nutch.fetch.time")))); + record.setMimetype(WARCConstants.HTTP_REQUEST_MIMETYPE); + record.setRecordId(GENERATOR.getRecordID()); + + if (id != null) { + ANVLRecord headers = new ANVLRecord(); + headers.addLabelValue(WARCConstants.HEADER_KEY_CONCURRENT_TO, + '<' + id.toString() + '>'); + record.setExtraHeaders(headers); + } + + ByteArrayOutputStream output = new ByteArrayOutputStream(); + + output.write(metadata.get("_request_").getBytes()); + record.setContentLength(output.size()); + record.setContentStream(new ByteArrayInputStream(output.toByteArray())); + + writer.writeRecord(record); + + return record.getRecordId(); + } + + @Override + protected String generateJson() throws IOException { + return null; + } + + @Override + protected void writeKeyValue(String key, String value) throws IOException { + throw new NotImplementedException(); + } + + @Override + protected void writeKeyNull(String key) throws IOException { + throw new NotImplementedException(); + } + + @Override + protected void startArray(String key, boolean nested, boolean newline) + throws IOException { + throw new NotImplementedException(); + } + + @Override + protected void closeArray(String key, boolean nested, boolean newline) + throws IOException { + throw new NotImplementedException(); + } + + @Override + protected void writeArrayValue(String value) throws IOException { + throw new NotImplementedException(); + } + + @Override + protected void startObject(String key) throws IOException { + throw new NotImplementedException(); + } + + @Override + protected void closeObject(String key) throws IOException { + throw new NotImplementedException(); + } + + @Override + public void close() { + if (writer != null) + try { + writer.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +}
