http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/tools/warc/WARCExporter.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/tools/warc/WARCExporter.java b/nutch-core/src/main/java/org/apache/nutch/tools/warc/WARCExporter.java new file mode 100644 index 0000000..2e50105 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/tools/warc/WARCExporter.java @@ -0,0 +1,333 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.tools.warc; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.IOException; +import java.net.URI; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.Iterator; +import java.util.List; +import java.util.Locale; +import java.util.UUID; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.RunningJob; +import org.apache.hadoop.mapred.SequenceFileInputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.nutch.crawl.CrawlDatum; +import org.apache.nutch.crawl.NutchWritable; +import org.apache.nutch.parse.ParseSegment; +import org.apache.nutch.protocol.Content; +import org.apache.nutch.util.HadoopFSUtil; +import org.apache.nutch.util.NutchConfiguration; +import org.apache.nutch.util.NutchJob; +import org.apache.nutch.util.TimingUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.martinkl.warc.WARCRecord; +import com.martinkl.warc.WARCWritable; +import com.martinkl.warc.mapred.WARCOutputFormat; + +/** + * MapReduce job to exports Nutch segments as WARC files. The file format is + * documented in the [ISO + * Standard](http://bibnum.bnf.fr/warc/WARC_ISO_28500_version1_latestdraft.pdf). + * Generates elements of type response if the configuration 'store.http.headers' + * was set to true during the fetching and the http headers were stored + * verbatim; generates elements of type 'resource' otherwise. + **/ + +public class WARCExporter extends Configured implements Tool { + + public static Logger LOG = LoggerFactory.getLogger(WARCExporter.class); + + private static final String CRLF = "\r\n"; + private static final byte[] CRLF_BYTES = { 13, 10 }; + + public WARCExporter() { + super(null); + } + + public WARCExporter(Configuration conf) { + super(conf); + } + + public static class WARCReducer + implements Mapper<Text, Writable, Text, NutchWritable>, + Reducer<Text, NutchWritable, NullWritable, WARCWritable> { + + SimpleDateFormat warcdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'", + Locale.ENGLISH); + + @Override + public void configure(JobConf job) { + } + + @Override + public void close() throws IOException { + } + + public void map(Text key, Writable value, + OutputCollector<Text, NutchWritable> output, Reporter reporter) + throws IOException { + output.collect(key, new NutchWritable(value)); + } + + @Override + public void reduce(Text key, Iterator<NutchWritable> values, + OutputCollector<NullWritable, WARCWritable> output, Reporter reporter) + throws IOException { + + Content content = null; + CrawlDatum cd = null; + + // aggregate the values found + while (values.hasNext()) { + final Writable value = values.next().get(); // unwrap + if (value instanceof Content) { + content = (Content) value; + continue; + } + if (value instanceof CrawlDatum) { + cd = (CrawlDatum) value; + continue; + } + } + + // check that we have everything we need + if (content == null) { + LOG.info("Missing content for {}", key); + reporter.getCounter("WARCExporter", "missing content").increment(1); + return; + } + + if (cd == null) { + LOG.info("Missing fetch datum for {}", key); + reporter.getCounter("WARCExporter", "missing metadata").increment(1); + return; + } + + // were the headers stored as is? Can write a response element then + String headersVerbatim = content.getMetadata().get("_response.headers_"); + byte[] httpheaders = new byte[0]; + if (StringUtils.isNotBlank(headersVerbatim)) { + // check that ends with an empty line + if (!headersVerbatim.endsWith(CRLF + CRLF)) { + headersVerbatim += CRLF + CRLF; + } + httpheaders = headersVerbatim.getBytes(); + } + + StringBuilder buffer = new StringBuilder(); + buffer.append(WARCRecord.WARC_VERSION); + buffer.append(CRLF); + + buffer.append("WARC-Record-ID").append(": ").append("<urn:uuid:") + .append(UUID.randomUUID().toString()).append(">").append(CRLF); + + int contentLength = 0; + if (content != null) { + contentLength = content.getContent().length; + } + + // add the length of the http header + contentLength += httpheaders.length; + + buffer.append("Content-Length").append(": ") + .append(Integer.toString(contentLength)).append(CRLF); + + Date fetchedDate = new Date(cd.getFetchTime()); + buffer.append("WARC-Date").append(": ").append(warcdf.format(fetchedDate)) + .append(CRLF); + + // check if http headers have been stored verbatim + // if not generate a response instead + String WARCTypeValue = "resource"; + + if (StringUtils.isNotBlank(headersVerbatim)) { + WARCTypeValue = "response"; + } + + buffer.append("WARC-Type").append(": ").append(WARCTypeValue) + .append(CRLF); + + // "WARC-IP-Address" if present + String IP = content.getMetadata().get("_ip_"); + if (StringUtils.isNotBlank(IP)) { + buffer.append("WARC-IP-Address").append(": ").append("IP").append(CRLF); + } + + // detect if truncated only for fetch success + String status = CrawlDatum.getStatusName(cd.getStatus()); + if (status.equalsIgnoreCase("STATUS_FETCH_SUCCESS") + && ParseSegment.isTruncated(content)) { + buffer.append("WARC-Truncated").append(": ").append("unspecified") + .append(CRLF); + } + + // must be a valid URI + try { + String normalised = key.toString().replaceAll(" ", "%20"); + URI uri = URI.create(normalised); + buffer.append("WARC-Target-URI").append(": ") + .append(uri.toASCIIString()).append(CRLF); + } catch (Exception e) { + LOG.error("Invalid URI {} ", key); + reporter.getCounter("WARCExporter", "invalid URI").increment(1); + return; + } + + // provide a ContentType if type response + if (WARCTypeValue.equals("response")) { + buffer.append("Content-Type: application/http; msgtype=response") + .append(CRLF); + } + + // finished writing the WARC headers, now let's serialize it + + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + + // store the headers + bos.write(buffer.toString().getBytes("UTF-8")); + bos.write(CRLF_BYTES); + // the http headers + bos.write(httpheaders); + + // the binary content itself + if (content.getContent() != null) { + bos.write(content.getContent()); + } + bos.write(CRLF_BYTES); + bos.write(CRLF_BYTES); + + try { + DataInput in = new DataInputStream( + new ByteArrayInputStream(bos.toByteArray())); + WARCRecord record = new WARCRecord(in); + output.collect(NullWritable.get(), new WARCWritable(record)); + reporter.getCounter("WARCExporter", "records generated").increment(1); + } catch (IOException exception) { + LOG.error("Exception when generating WARC record for {} : {}", key, + exception.getMessage()); + reporter.getCounter("WARCExporter", "exception").increment(1); + } + + } + } + + public int generateWARC(String output, List<Path> segments) { + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + long start = System.currentTimeMillis(); + LOG.info("WARCExporter: starting at {}", sdf.format(start)); + + final JobConf job = new NutchJob(getConf()); + job.setJobName("warc-exporter " + output); + + for (final Path segment : segments) { + LOG.info("warc-exporter: adding segment: {}", segment); + FileInputFormat.addInputPath(job, new Path(segment, Content.DIR_NAME)); + FileInputFormat.addInputPath(job, + new Path(segment, CrawlDatum.FETCH_DIR_NAME)); + } + + job.setInputFormat(SequenceFileInputFormat.class); + + job.setMapperClass(WARCReducer.class); + job.setReducerClass(WARCReducer.class); + + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(NutchWritable.class); + + FileOutputFormat.setOutputPath(job, new Path(output)); + // using the old api + job.setOutputFormat(WARCOutputFormat.class); + + job.setOutputKeyClass(NullWritable.class); + job.setOutputValueClass(WARCWritable.class); + + try { + RunningJob rj = JobClient.runJob(job); + LOG.info(rj.getCounters().toString()); + long end = System.currentTimeMillis(); + LOG.info("WARCExporter: finished at {}, elapsed: {}", sdf.format(end), + TimingUtil.elapsedTime(start, end)); + } catch (Exception e) { + LOG.error("Exception caught", e); + return -1; + } + + return 0; + } + + public int run(String[] args) throws Exception { + if (args.length < 2) { + System.err.println( + "Usage: WARCExporter <output> (<segment> ... | -dir <segments>)"); + return -1; + } + + final List<Path> segments = new ArrayList<Path>(); + + for (int i = 1; i < args.length; i++) { + if (args[i].equals("-dir")) { + Path dir = new Path(args[++i]); + FileSystem fs = dir.getFileSystem(getConf()); + FileStatus[] fstats = fs.listStatus(dir, + HadoopFSUtil.getPassDirectoriesFilter(fs)); + Path[] files = HadoopFSUtil.getPaths(fstats); + for (Path p : files) { + segments.add(p); + } + } else { + segments.add(new Path(args[i])); + } + } + + return generateWARC(args[0], segments); + } + + public static void main(String[] args) throws Exception { + final int res = ToolRunner.run(NutchConfiguration.create(), + new WARCExporter(), args); + System.exit(res); + } +}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/tools/warc/package-info.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/tools/warc/package-info.java b/nutch-core/src/main/java/org/apache/nutch/tools/warc/package-info.java new file mode 100644 index 0000000..44e1a94 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/tools/warc/package-info.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +/** + * Tools to import / export between Nutch segments and + * <a href="http://bibnum.bnf.fr/warc/WARC_ISO_28500_version1_latestdraft.pdf"> + * WARC archives</a>. + */ +package org.apache.nutch.tools.warc; http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/util/CommandRunner.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/util/CommandRunner.java b/nutch-core/src/main/java/org/apache/nutch/util/CommandRunner.java new file mode 100644 index 0000000..593d590 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/util/CommandRunner.java @@ -0,0 +1,291 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Adopted by John Xing for Nutch Project from + * http://blog.fivesight.com/prb/space/Call+an+External+Command+from+Java/, + * which explains the code in detail. + * [Original author is moving his site to http://mult.ifario.us/ -peb] + * + * Comments by John Xing on 20040621: + * (1) EDU.oswego.cs.dl.util.concurrent.* is in j2sdk 1.5 now. + * Modifications are needed if we move to j2sdk 1.5. + * (2) The original looks good, not much to change. + * + * This code is in the public domain and comes with no warranty. + */ +package org.apache.nutch.util; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.InterruptedIOException; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class CommandRunner { + + private boolean _waitForExit = true; + private String _command; + private int _timeout = 10; + + private InputStream _stdin; + private OutputStream _stdout; + private OutputStream _stderr; + + private static final int BUF = 4096; + + private int _xit; + + private Throwable _thrownError; + + private CyclicBarrier _barrier; + + public int getExitValue() { + return _xit; + } + + public void setCommand(String s) { + _command = s; + } + + public String getCommand() { + return _command; + } + + public void setInputStream(InputStream is) { + _stdin = is; + } + + public void setStdOutputStream(OutputStream os) { + _stdout = os; + } + + public void setStdErrorStream(OutputStream os) { + _stderr = os; + } + + public void evaluate() throws IOException { + this.exec(); + } + + /** + * + * @return process exit value (return code) or -1 if timed out. + * @throws IOException + */ + public int exec() throws IOException { + Process proc = Runtime.getRuntime().exec(_command); + _barrier = new CyclicBarrier(3 + ((_stdin != null) ? 1 : 0)); + + PullerThread so = new PullerThread("STDOUT", proc.getInputStream(), _stdout); + so.setDaemon(true); + so.start(); + + PullerThread se = new PullerThread("STDERR", proc.getErrorStream(), _stderr); + se.setDaemon(true); + se.start(); + + PusherThread si = null; + if (_stdin != null) { + si = new PusherThread("STDIN", _stdin, proc.getOutputStream()); + si.setDaemon(true); + si.start(); + } + + boolean _timedout = false; + long end = System.currentTimeMillis() + _timeout * 1000; + + // + try { + if (_timeout == 0) { + _barrier.await(); + } else { + _barrier.await(_timeout, TimeUnit.SECONDS); + } + } catch (TimeoutException ex) { + _timedout = true; + } catch (BrokenBarrierException bbe) { + /* IGNORE */ + } catch (InterruptedException e) { + /* IGNORE */ + } + + // tell the io threads we are finished + if (si != null) { + si.interrupt(); + } + so.interrupt(); + se.interrupt(); + + _xit = -1; + + if (!_timedout) { + if (_waitForExit) { + do { + try { + Thread.sleep(1000); + _xit = proc.exitValue(); + } catch (InterruptedException ie) { + if (Thread.interrupted()) { + break; // stop waiting on an interrupt for this thread + } else { + continue; + } + } catch (IllegalThreadStateException iltse) { + continue; + } + break; + } while (!(_timedout = (System.currentTimeMillis() > end))); + } else { + try { + _xit = proc.exitValue(); + } catch (IllegalThreadStateException iltse) { + _timedout = true; + } + } + } + + if (_waitForExit) { + proc.destroy(); + } + return _xit; + } + + public Throwable getThrownError() { + return _thrownError; + } + + private class PumperThread extends Thread { + + private OutputStream _os; + private InputStream _is; + + private boolean _closeInput; + + protected PumperThread(String name, InputStream is, OutputStream os, + boolean closeInput) { + super(name); + _is = is; + _os = os; + _closeInput = closeInput; + } + + public void run() { + try { + byte[] buf = new byte[BUF]; + int read = 0; + while (!isInterrupted() && (read = _is.read(buf)) != -1) { + if (read == 0) + continue; + _os.write(buf, 0, read); + _os.flush(); + } + } catch (InterruptedIOException iioe) { + // ignored + } catch (Throwable t) { + _thrownError = t; + } finally { + try { + if (_closeInput) { + _is.close(); + } else { + _os.close(); + } + } catch (IOException ioe) { + /* IGNORE */ + } + } + try { + _barrier.await(); + } catch (InterruptedException ie) { + /* IGNORE */ + } catch (BrokenBarrierException bbe) { + /* IGNORE */ + } + } + } + + private class PusherThread extends PumperThread { + PusherThread(String name, InputStream is, OutputStream os) { + super(name, is, os, false); + } + } + + private class PullerThread extends PumperThread { + PullerThread(String name, InputStream is, OutputStream os) { + super(name, is, os, true); + } + } + + public int getTimeout() { + return _timeout; + } + + public void setTimeout(int timeout) { + _timeout = timeout; + } + + public boolean getWaitForExit() { + return _waitForExit; + } + + public void setWaitForExit(boolean waitForExit) { + _waitForExit = waitForExit; + } + + public static void main(String[] args) throws Exception { + String commandPath = null; + String filePath = null; + int timeout = 10; + + String usage = "Usage: CommandRunner [-timeout timeoutSecs] commandPath filePath"; + + if (args.length < 2) { + System.err.println(usage); + System.exit(-1); + } + + for (int i = 0; i < args.length; i++) { + if (args[i].equals("-timeout")) { + timeout = Integer.parseInt(args[++i]); + ; + } else if (i != args.length - 2) { + System.err.println(usage); + System.exit(-1); + } else { + commandPath = args[i]; + filePath = args[++i]; + } + } + + CommandRunner cr = new CommandRunner(); + + cr.setCommand(commandPath); + cr.setInputStream(new java.io.FileInputStream(filePath)); + cr.setStdErrorStream(System.err); + cr.setStdOutputStream(System.out); + + cr.setTimeout(timeout); + + cr.evaluate(); + + System.err.println("output value: " + cr.getExitValue()); + } +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/util/CrawlCompletionStats.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/util/CrawlCompletionStats.java b/nutch-core/src/main/java/org/apache/nutch/util/CrawlCompletionStats.java new file mode 100644 index 0000000..8aafe59 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/util/CrawlCompletionStats.java @@ -0,0 +1,245 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.util; + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.text.SimpleDateFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.nutch.crawl.CrawlDatum; +import org.apache.nutch.util.NutchConfiguration; +import org.apache.nutch.util.TimingUtil; +import org.apache.nutch.util.URLUtil; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.MissingOptionException; + +/** + * Extracts some simple crawl completion stats from the crawldb + * + * Stats will be sorted by host/domain and will be of the form: + * 1 www.spitzer.caltech.edu FETCHED + * 50 www.spitzer.caltech.edu UNFETCHED + * + */ +public class CrawlCompletionStats extends Configured implements Tool { + + private static final Logger LOG = LoggerFactory + .getLogger(CrawlCompletionStats.class); + + private static final int MODE_HOST = 1; + private static final int MODE_DOMAIN = 2; + + private int mode = 0; + + public int run(String[] args) throws Exception { + Option helpOpt = new Option("h", "help", false, "Show this message"); + Option inDirs = OptionBuilder + .withArgName("inputDirs") + .isRequired() + .withDescription("Comma separated list of crawl directories (e.g., \"./crawl1,./crawl2\")") + .hasArgs() + .create("inputDirs"); + Option outDir = OptionBuilder + .withArgName("outputDir") + .isRequired() + .withDescription("Output directory where results should be dumped") + .hasArgs() + .create("outputDir"); + Option modeOpt = OptionBuilder + .withArgName("mode") + .isRequired() + .withDescription("Set statistics gathering mode (by 'host' or by 'domain')") + .hasArgs() + .create("mode"); + Option numReducers = OptionBuilder + .withArgName("numReducers") + .withDescription("Optional number of reduce jobs to use. Defaults to 1") + .hasArgs() + .create("numReducers"); + + Options options = new Options(); + options.addOption(helpOpt); + options.addOption(inDirs); + options.addOption(outDir); + options.addOption(modeOpt); + options.addOption(numReducers); + + CommandLineParser parser = new GnuParser(); + CommandLine cli; + + try { + cli = parser.parse(options, args); + } catch (MissingOptionException e) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("CrawlCompletionStats", options, true); + return 1; + } + + if (cli.hasOption("help")) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("CrawlCompletionStats", options, true); + return 1; + } + + String inputDir = cli.getOptionValue("inputDirs"); + String outputDir = cli.getOptionValue("outputDir"); + + int numOfReducers = 1; + if (cli.hasOption("numReducers")) { + numOfReducers = Integer.parseInt(args[3]); + } + + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + long start = System.currentTimeMillis(); + LOG.info("CrawlCompletionStats: starting at {}", sdf.format(start)); + + int mode = 0; + String jobName = "CrawlCompletionStats"; + if (cli.getOptionValue("mode").equals("host")) { + jobName = "Host CrawlCompletionStats"; + mode = MODE_HOST; + } else if (cli.getOptionValue("mode").equals("domain")) { + jobName = "Domain CrawlCompletionStats"; + mode = MODE_DOMAIN; + } + + Configuration conf = getConf(); + conf.setInt("domain.statistics.mode", mode); + conf.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false); + + Job job = Job.getInstance(conf, jobName); + job.setJarByClass(CrawlCompletionStats.class); + + String[] inputDirsSpecs = inputDir.split(","); + for (int i = 0; i < inputDirsSpecs.length; i++) { + File completeInputPath = new File(new File(inputDirsSpecs[i]), "crawldb/current"); + FileInputFormat.addInputPath(job, new Path(completeInputPath.toString())); + + } + + job.setInputFormatClass(SequenceFileInputFormat.class); + FileOutputFormat.setOutputPath(job, new Path(outputDir)); + job.setOutputFormatClass(TextOutputFormat.class); + + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(LongWritable.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(LongWritable.class); + + job.setMapperClass(CrawlCompletionStatsMapper.class); + job.setReducerClass(CrawlCompletionStatsReducer.class); + job.setCombinerClass(CrawlCompletionStatsCombiner.class); + job.setNumReduceTasks(numOfReducers); + + try { + job.waitForCompletion(true); + } catch (Exception e) { + throw e; + } + + long end = System.currentTimeMillis(); + LOG.info("CrawlCompletionStats: finished at {}, elapsed: {}", + sdf.format(end), TimingUtil.elapsedTime(start, end)); + return 0; + } + + static class CrawlCompletionStatsMapper extends + Mapper<Text, CrawlDatum, Text, LongWritable> { + int mode = 0; + + public void setup(Context context) { + mode = context.getConfiguration().getInt("domain.statistics.mode", MODE_DOMAIN); + } + + public void map(Text urlText, CrawlDatum datum, Context context) + throws IOException, InterruptedException { + + URL url = new URL(urlText.toString()); + String out = ""; + switch (mode) { + case MODE_HOST: + out = url.getHost(); + break; + case MODE_DOMAIN: + out = URLUtil.getDomainName(url); + break; + } + + if (datum.getStatus() == CrawlDatum.STATUS_DB_FETCHED + || datum.getStatus() == CrawlDatum.STATUS_DB_NOTMODIFIED) { + context.write(new Text(out + " FETCHED"), new LongWritable(1)); + } else { + context.write(new Text(out + " UNFETCHED"), new LongWritable(1)); + } + } + } + + static class CrawlCompletionStatsReducer extends + Reducer<Text, LongWritable, LongWritable, Text> { + public void reduce(Text key, Iterable<LongWritable> values, Context context) + throws IOException, InterruptedException { + long total = 0; + + for (LongWritable val : values) { + total += val.get(); + } + + context.write(new LongWritable(total), key); + } + } + + public static class CrawlCompletionStatsCombiner extends + Reducer<Text, LongWritable, Text, LongWritable> { + public void reduce(Text key, Iterable<LongWritable> values, Context context) + throws IOException, InterruptedException { + long total = 0; + + for (LongWritable val : values) { + total += val.get(); + } + context.write(key, new LongWritable(total)); + } + } + + public static void main(String[] args) throws Exception { + ToolRunner.run(NutchConfiguration.create(), new CrawlCompletionStats(), args); + } +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/util/DeflateUtils.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/util/DeflateUtils.java b/nutch-core/src/main/java/org/apache/nutch/util/DeflateUtils.java new file mode 100644 index 0000000..5863522 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/util/DeflateUtils.java @@ -0,0 +1,140 @@ +/** + * Copyright 2005 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.util; + +import java.io.ByteArrayOutputStream; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.zip.Inflater; +import java.util.zip.InflaterInputStream; +import java.util.zip.DeflaterOutputStream; + +// Slf4j Logging imports +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A collection of utility methods for working on deflated data. + */ +public class DeflateUtils { + + private static final Logger LOG = LoggerFactory.getLogger(DeflateUtils.class); + private static final int EXPECTED_COMPRESSION_RATIO = 5; + private static final int BUF_SIZE = 4096; + + /** + * Returns an inflated copy of the input array. If the deflated input has been + * truncated or corrupted, a best-effort attempt is made to inflate as much as + * possible. If no data can be extracted <code>null</code> is returned. + */ + public static final byte[] inflateBestEffort(byte[] in) { + return inflateBestEffort(in, Integer.MAX_VALUE); + } + + /** + * Returns an inflated copy of the input array, truncated to + * <code>sizeLimit</code> bytes, if necessary. If the deflated input has been + * truncated or corrupted, a best-effort attempt is made to inflate as much as + * possible. If no data can be extracted <code>null</code> is returned. + */ + public static final byte[] inflateBestEffort(byte[] in, int sizeLimit) { + // decompress using InflaterInputStream + ByteArrayOutputStream outStream = new ByteArrayOutputStream( + EXPECTED_COMPRESSION_RATIO * in.length); + + // "true" because HTTP does not provide zlib headers + Inflater inflater = new Inflater(true); + InflaterInputStream inStream = new InflaterInputStream( + new ByteArrayInputStream(in), inflater); + + byte[] buf = new byte[BUF_SIZE]; + int written = 0; + while (true) { + try { + int size = inStream.read(buf); + if (size <= 0) + break; + if ((written + size) > sizeLimit) { + outStream.write(buf, 0, sizeLimit - written); + break; + } + outStream.write(buf, 0, size); + written += size; + } catch (Exception e) { + LOG.info("Caught Exception in inflateBestEffort", e); + break; + } + } + try { + outStream.close(); + } catch (IOException e) { + } + + return outStream.toByteArray(); + } + + /** + * Returns an inflated copy of the input array. + * + * @throws IOException + * if the input cannot be properly decompressed + */ + public static final byte[] inflate(byte[] in) throws IOException { + // decompress using InflaterInputStream + ByteArrayOutputStream outStream = new ByteArrayOutputStream( + EXPECTED_COMPRESSION_RATIO * in.length); + + InflaterInputStream inStream = new InflaterInputStream( + new ByteArrayInputStream(in)); + + byte[] buf = new byte[BUF_SIZE]; + while (true) { + int size = inStream.read(buf); + if (size <= 0) + break; + outStream.write(buf, 0, size); + } + outStream.close(); + + return outStream.toByteArray(); + } + + /** + * Returns a deflated copy of the input array. + */ + public static final byte[] deflate(byte[] in) { + // compress using DeflaterOutputStream + ByteArrayOutputStream byteOut = new ByteArrayOutputStream(in.length + / EXPECTED_COMPRESSION_RATIO); + + DeflaterOutputStream outStream = new DeflaterOutputStream(byteOut); + + try { + outStream.write(in); + } catch (Exception e) { + LOG.error("Error compressing: ", e); + } + + try { + outStream.close(); + } catch (IOException e) { + LOG.error("Error closing: ", e); + } + + return byteOut.toByteArray(); + } +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/util/DomUtil.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/util/DomUtil.java b/nutch-core/src/main/java/org/apache/nutch/util/DomUtil.java new file mode 100644 index 0000000..9595bf4 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/util/DomUtil.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.util; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.UnsupportedEncodingException; + +import javax.xml.transform.Transformer; +import javax.xml.transform.TransformerConfigurationException; +import javax.xml.transform.TransformerException; +import javax.xml.transform.TransformerFactory; +import javax.xml.transform.dom.DOMSource; +import javax.xml.transform.stream.StreamResult; + +import org.apache.xerces.parsers.DOMParser; +import org.w3c.dom.Element; +import org.xml.sax.InputSource; +import org.xml.sax.SAXException; + +// Slf4j Logging imports +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DomUtil { + + private final static Logger LOG = LoggerFactory.getLogger(DomUtil.class); + + /** + * Returns parsed dom tree or null if any error + * + * @param is + * @return A parsed DOM tree from the given {@link InputStream}. + */ + public static Element getDom(InputStream is) { + + Element element = null; + + DOMParser parser = new DOMParser(); + + InputSource input; + try { + input = new InputSource(is); + input.setEncoding("UTF-8"); + parser.parse(input); + int i = 0; + while (!(parser.getDocument().getChildNodes().item(i) instanceof Element)) { + i++; + } + element = (Element) parser.getDocument().getChildNodes().item(i); + } catch (FileNotFoundException e) { + LOG.error("Error: ", e); + } catch (SAXException e) { + LOG.error("Error: ", e); + } catch (IOException e) { + LOG.error("Error: ", e); + } + return element; + } + + /** + * save dom into ouputstream + * + * @param os + * @param e + */ + public static void saveDom(OutputStream os, Element e) { + + DOMSource source = new DOMSource(e); + TransformerFactory transFactory = TransformerFactory.newInstance(); + Transformer transformer; + try { + transformer = transFactory.newTransformer(); + transformer.setOutputProperty("indent", "yes"); + StreamResult result = new StreamResult(os); + transformer.transform(source, result); + os.flush(); + } catch (UnsupportedEncodingException e1) { + LOG.error("Error: ", e1); + } catch (IOException e1) { + LOG.error("Error: ", e1); + } catch (TransformerConfigurationException e2) { + LOG.error("Error: ", e2); + } catch (TransformerException ex) { + LOG.error("Error: ", ex); + } + } +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/util/DumpFileUtil.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/util/DumpFileUtil.java b/nutch-core/src/main/java/org/apache/nutch/util/DumpFileUtil.java new file mode 100644 index 0000000..9ed3e75 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/util/DumpFileUtil.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.util; + +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.io.MD5Hash; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.Map; + +public class DumpFileUtil { + private static final Logger LOG = LoggerFactory.getLogger(DumpFileUtil.class + .getName()); + + private final static String DIR_PATTERN = "%s/%s/%s"; + private final static String FILENAME_PATTERN = "%s_%s.%s"; + private final static Integer MAX_LENGTH_OF_FILENAME = 32; + private final static Integer MAX_LENGTH_OF_EXTENSION = 5; + + public static String getUrlMD5(String url) { + byte[] digest = MD5Hash.digest(url).getDigest(); + + StringBuffer sb = new StringBuffer(); + for (byte b : digest) { + sb.append(String.format("%02x", b & 0xff)); + } + + return sb.toString(); + } + + public static String createTwoLevelsDirectory(String basePath, String md5, boolean makeDir) { + String firstLevelDirName = new StringBuilder().append(md5.charAt(0)).append(md5.charAt(8)).toString(); + String secondLevelDirName = new StringBuilder().append(md5.charAt(16)).append(md5.charAt(24)).toString(); + + String fullDirPath = String.format(DIR_PATTERN, basePath, firstLevelDirName, secondLevelDirName); + + if (makeDir) { + try { + FileUtils.forceMkdir(new File(fullDirPath)); + } catch (IOException e) { + LOG.error("Failed to create dir: {}", fullDirPath); + fullDirPath = null; + } + } + + return fullDirPath; + } + + public static String createTwoLevelsDirectory(String basePath, String md5) { + return createTwoLevelsDirectory(basePath, md5, true); + } + + public static String createFileName(String md5, String fileBaseName, String fileExtension) { + if (fileBaseName.length() > MAX_LENGTH_OF_FILENAME) { + LOG.info("File name is too long. Truncated to {} characters.", MAX_LENGTH_OF_FILENAME); + fileBaseName = StringUtils.substring(fileBaseName, 0, MAX_LENGTH_OF_FILENAME); + } + + if (fileExtension.length() > MAX_LENGTH_OF_EXTENSION) { + LOG.info("File extension is too long. Truncated to {} characters.", MAX_LENGTH_OF_EXTENSION); + fileExtension = StringUtils.substring(fileExtension, 0, MAX_LENGTH_OF_EXTENSION); + } + + // Added to prevent FileNotFoundException (Invalid Argument) - in *nix environment + fileBaseName = fileBaseName.replaceAll("\\?", ""); + fileExtension = fileExtension.replaceAll("\\?", ""); + + return String.format(FILENAME_PATTERN, md5, fileBaseName, fileExtension); + } + + public static String createFileNameFromUrl(String basePath, String reverseKey, String urlString, String epochScrapeTime, String fileExtension, boolean makeDir) { + String fullDirPath = basePath + File.separator + reverseKey + File.separator + DigestUtils.sha1Hex(urlString); + + if (makeDir) { + try { + FileUtils.forceMkdir(new File(fullDirPath)); + } catch (IOException e) { + LOG.error("Failed to create dir: {}", fullDirPath); + fullDirPath = null; + } + } + + if (fileExtension.length() > MAX_LENGTH_OF_EXTENSION) { + LOG.info("File extension is too long. Truncated to {} characters.", MAX_LENGTH_OF_EXTENSION); + fileExtension = StringUtils.substring(fileExtension, 0, MAX_LENGTH_OF_EXTENSION); + } + + String outputFullPath = fullDirPath + File.separator + epochScrapeTime + "." + fileExtension; + + return outputFullPath; + } + + public static String displayFileTypes(Map<String, Integer> typeCounts, Map<String, Integer> filteredCounts) { + StringBuilder builder = new StringBuilder(); + // print total stats + builder.append("\nTOTAL Stats:\n"); + builder.append("[\n"); + int mimetypeCount = 0; + for (String mimeType : typeCounts.keySet()) { + builder.append(" {\"mimeType\":\""); + builder.append(mimeType); + builder.append("\",\"count\":\""); + builder.append(typeCounts.get(mimeType)); + builder.append("\"}\n"); + mimetypeCount += typeCounts.get(mimeType); + } + builder.append("]\n"); + builder.append("Total count: " + mimetypeCount + "\n"); + // filtered types stats + mimetypeCount = 0; + if (!filteredCounts.isEmpty()) { + builder.append("\nFILTERED Stats:\n"); + builder.append("[\n"); + for (String mimeType : filteredCounts.keySet()) { + builder.append(" {\"mimeType\":\""); + builder.append(mimeType); + builder.append("\",\"count\":\""); + builder.append(filteredCounts.get(mimeType)); + builder.append("\"}\n"); + mimetypeCount += filteredCounts.get(mimeType); + } + builder.append("]\n"); + builder.append("Total filtered count: " + mimetypeCount + "\n"); + } + return builder.toString(); + } +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/util/EncodingDetector.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/util/EncodingDetector.java b/nutch-core/src/main/java/org/apache/nutch/util/EncodingDetector.java new file mode 100644 index 0000000..4e62dd3 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/util/EncodingDetector.java @@ -0,0 +1,386 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.util; + +import java.io.BufferedInputStream; +import java.io.ByteArrayOutputStream; +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.nutch.metadata.Metadata; +import org.apache.nutch.net.protocols.Response; +import org.apache.nutch.protocol.Content; +import org.apache.nutch.util.NutchConfiguration; + +import com.ibm.icu.text.CharsetDetector; +import com.ibm.icu.text.CharsetMatch; + +/** + * A simple class for detecting character encodings. + * + * <p> + * Broadly this encompasses two functions, which are distinctly separate: + * + * <ol> + * <li>Auto detecting a set of "clues" from input text.</li> + * <li>Taking a set of clues and making a "best guess" as to the "real" + * encoding.</li> + * </ol> + * </p> + * + * <p> + * A caller will often have some extra information about what the encoding might + * be (e.g. from the HTTP header or HTML meta-tags, often wrong but still + * potentially useful clues). The types of clues may differ from caller to + * caller. Thus a typical calling sequence is: + * <ul> + * <li>Run step (1) to generate a set of auto-detected clues;</li> + * <li>Combine these clues with the caller-dependent "extra clues" available;</li> + * <li>Run step (2) to guess what the most probable answer is.</li> + * </p> + */ +public class EncodingDetector { + + private class EncodingClue { + private String value; + private String source; + private int confidence; + + // Constructor for clues with no confidence values (ignore thresholds) + public EncodingClue(String value, String source) { + this(value, source, NO_THRESHOLD); + } + + public EncodingClue(String value, String source, int confidence) { + this.value = value.toLowerCase(); + this.source = source; + this.confidence = confidence; + } + + public String getSource() { + return source; + } + + public String getValue() { + return value; + } + + public String toString() { + return value + " (" + source + + ((confidence >= 0) ? ", " + confidence + "% confidence" : "") + ")"; + } + + public boolean isEmpty() { + return (value == null || "".equals(value)); + } + + public boolean meetsThreshold() { + return (confidence < 0 || (minConfidence >= 0 && confidence >= minConfidence)); + } + } + + public static final Logger LOG = LoggerFactory + .getLogger(EncodingDetector.class); + + public static final int NO_THRESHOLD = -1; + + public static final String MIN_CONFIDENCE_KEY = "encodingdetector.charset.min.confidence"; + + private static final HashMap<String, String> ALIASES = new HashMap<String, String>(); + + private static final HashSet<String> DETECTABLES = new HashSet<String>(); + + // CharsetDetector will die without a minimum amount of data. + private static final int MIN_LENGTH = 4; + + static { + DETECTABLES.add("text/html"); + DETECTABLES.add("text/plain"); + DETECTABLES.add("text/richtext"); + DETECTABLES.add("text/rtf"); + DETECTABLES.add("text/sgml"); + DETECTABLES.add("text/tab-separated-values"); + DETECTABLES.add("text/xml"); + DETECTABLES.add("application/rss+xml"); + DETECTABLES.add("application/xhtml+xml"); + /* + * the following map is not an alias mapping table, but maps character + * encodings which are often used in mislabelled documents to their correct + * encodings. For instance, there are a lot of documents labelled + * 'ISO-8859-1' which contain characters not covered by ISO-8859-1 but + * covered by windows-1252. Because windows-1252 is a superset of ISO-8859-1 + * (sharing code points for the common part), it's better to treat + * ISO-8859-1 as synonymous with windows-1252 than to reject, as invalid, + * documents labelled as ISO-8859-1 that have characters outside ISO-8859-1. + */ + ALIASES.put("ISO-8859-1", "windows-1252"); + ALIASES.put("EUC-KR", "x-windows-949"); + ALIASES.put("x-EUC-CN", "GB18030"); + ALIASES.put("GBK", "GB18030"); + // ALIASES.put("Big5", "Big5HKSCS"); + // ALIASES.put("TIS620", "Cp874"); + // ALIASES.put("ISO-8859-11", "Cp874"); + + } + + private int minConfidence; + + private CharsetDetector detector; + + private List<EncodingClue> clues; + + public EncodingDetector(Configuration conf) { + minConfidence = conf.getInt(MIN_CONFIDENCE_KEY, -1); + detector = new CharsetDetector(); + clues = new ArrayList<EncodingClue>(); + } + + public void autoDetectClues(Content content, boolean filter) { + byte[] data = content.getContent(); + + if (minConfidence >= 0 && DETECTABLES.contains(content.getContentType()) + && data.length > MIN_LENGTH) { + CharsetMatch[] matches = null; + + // do all these in a try/catch; setText and detect/detectAll + // will sometimes throw exceptions + try { + detector.enableInputFilter(filter); + if (data.length > MIN_LENGTH) { + detector.setText(data); + matches = detector.detectAll(); + } + } catch (Exception e) { + LOG.debug("Exception from ICU4J (ignoring): ", e); + } + + if (matches != null) { + for (CharsetMatch match : matches) { + addClue(match.getName(), "detect", match.getConfidence()); + } + } + } + + // add character encoding coming from HTTP response header + addClue( + parseCharacterEncoding(content.getMetadata().get(Response.CONTENT_TYPE)), + "header"); + } + + public void addClue(String value, String source, int confidence) { + if (value == null || "".equals(value)) { + return; + } + value = resolveEncodingAlias(value); + if (value != null) { + clues.add(new EncodingClue(value, source, confidence)); + } + } + + public void addClue(String value, String source) { + addClue(value, source, NO_THRESHOLD); + } + + /** + * Guess the encoding with the previously specified list of clues. + * + * @param content + * Content instance + * @param defaultValue + * Default encoding to return if no encoding can be detected with + * enough confidence. Note that this will <b>not</b> be normalized + * with {@link EncodingDetector#resolveEncodingAlias} + * + * @return Guessed encoding or defaultValue + */ + public String guessEncoding(Content content, String defaultValue) { + /* + * This algorithm could be replaced by something more sophisticated; ideally + * we would gather a bunch of data on where various clues (autodetect, HTTP + * headers, HTML meta tags, etc.) disagree, tag each with the correct + * answer, and use machine learning/some statistical method to generate a + * better heuristic. + */ + + String base = content.getBaseUrl(); + + if (LOG.isTraceEnabled()) { + findDisagreements(base, clues); + } + + /* + * Go down the list of encoding "clues". Use a clue if: 1. Has a confidence + * value which meets our confidence threshold, OR 2. Doesn't meet the + * threshold, but is the best try, since nothing else is available. + */ + EncodingClue defaultClue = new EncodingClue(defaultValue, "default"); + EncodingClue bestClue = defaultClue; + + for (EncodingClue clue : clues) { + if (LOG.isTraceEnabled()) { + LOG.trace(base + ": charset " + clue); + } + String charset = clue.value; + if (minConfidence >= 0 && clue.confidence >= minConfidence) { + if (LOG.isTraceEnabled()) { + LOG.trace(base + ": Choosing encoding: " + charset + + " with confidence " + clue.confidence); + } + return resolveEncodingAlias(charset).toLowerCase(); + } else if (clue.confidence == NO_THRESHOLD && bestClue == defaultClue) { + bestClue = clue; + } + } + + if (LOG.isTraceEnabled()) { + LOG.trace(base + ": Choosing encoding: " + bestClue); + } + return bestClue.value.toLowerCase(); + } + + /** Clears all clues. */ + public void clearClues() { + clues.clear(); + } + + /* + * Strictly for analysis, look for "disagreements." The top guess from each + * source is examined; if these meet the threshold and disagree, then we log + * the information -- useful for testing or generating training data for a + * better heuristic. + */ + private void findDisagreements(String url, List<EncodingClue> newClues) { + HashSet<String> valsSeen = new HashSet<String>(); + HashSet<String> sourcesSeen = new HashSet<String>(); + boolean disagreement = false; + for (int i = 0; i < newClues.size(); i++) { + EncodingClue clue = newClues.get(i); + if (!clue.isEmpty() && !sourcesSeen.contains(clue.source)) { + if (valsSeen.size() > 0 && !valsSeen.contains(clue.value) + && clue.meetsThreshold()) { + disagreement = true; + } + if (clue.meetsThreshold()) { + valsSeen.add(clue.value); + } + sourcesSeen.add(clue.source); + } + } + if (disagreement) { + // dump all values in case of disagreement + StringBuffer sb = new StringBuffer(); + sb.append("Disagreement: " + url + "; "); + for (int i = 0; i < newClues.size(); i++) { + if (i > 0) { + sb.append(", "); + } + sb.append(newClues.get(i)); + } + LOG.trace(sb.toString()); + } + } + + public static String resolveEncodingAlias(String encoding) { + try { + if (encoding == null || !Charset.isSupported(encoding)) + return null; + String canonicalName = new String(Charset.forName(encoding).name()); + return ALIASES.containsKey(canonicalName) ? ALIASES.get(canonicalName) + : canonicalName; + } catch (Exception e) { + LOG.warn("Invalid encoding " + encoding + " detected, using default."); + return null; + } + } + + /** + * Parse the character encoding from the specified content type header. If the + * content type is null, or there is no explicit character encoding, + * <code>null</code> is returned. <br /> + * This method was copied from org.apache.catalina.util.RequestUtil, which is + * licensed under the Apache License, Version 2.0 (the "License"). + * + * @param contentType + * a content type header + */ + public static String parseCharacterEncoding(String contentType) { + if (contentType == null) + return (null); + int start = contentType.indexOf("charset="); + if (start < 0) + return (null); + String encoding = contentType.substring(start + 8); + int end = encoding.indexOf(';'); + if (end >= 0) + encoding = encoding.substring(0, end); + encoding = encoding.trim(); + if ((encoding.length() > 2) && (encoding.startsWith("\"")) + && (encoding.endsWith("\""))) + encoding = encoding.substring(1, encoding.length() - 1); + return (encoding.trim()); + + } + + public static void main(String[] args) throws IOException { + if (args.length != 1) { + System.err.println("Usage: EncodingDetector <file>"); + System.exit(1); + } + + Configuration conf = NutchConfiguration.create(); + EncodingDetector detector = new EncodingDetector( + NutchConfiguration.create()); + + // do everything as bytes; don't want any conversion + BufferedInputStream istr = new BufferedInputStream(new FileInputStream( + args[0])); + ByteArrayOutputStream ostr = new ByteArrayOutputStream(); + byte[] bytes = new byte[1000]; + boolean more = true; + while (more) { + int len = istr.read(bytes); + if (len < bytes.length) { + more = false; + if (len > 0) { + ostr.write(bytes, 0, len); + } + } else { + ostr.write(bytes); + } + } + + byte[] data = ostr.toByteArray(); + + // make a fake Content + Content content = new Content("", "", data, "text/html", new Metadata(), + conf); + + detector.autoDetectClues(content, true); + String encoding = detector.guessEncoding(content, + conf.get("parser.character.encoding.default")); + System.out.println("Guessed encoding: " + encoding); + } + +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/util/FSUtils.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/util/FSUtils.java b/nutch-core/src/main/java/org/apache/nutch/util/FSUtils.java new file mode 100644 index 0000000..6aed8d5 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/util/FSUtils.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.util; + +import java.io.IOException; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.MapFile; +import org.apache.hadoop.io.SequenceFile; + +/** + * Utility methods for common filesystem operations. + */ +public class FSUtils { + + /** + * Replaces the current path with the new path and if set removes the old + * path. If removeOld is set to false then the old path will be set to the + * name current.old. + * + * @param fs + * The FileSystem. + * @param current + * The end path, the one being replaced. + * @param replacement + * The path to replace with. + * @param removeOld + * True if we are removing the current path. + * + * @throws IOException + * If an error occurs during replacement. + */ + public static void replace(FileSystem fs, Path current, Path replacement, + boolean removeOld) throws IOException { + + // rename any current path to old + Path old = new Path(current + ".old"); + if (fs.exists(current)) { + fs.rename(current, old); + } + + // rename the new path to current and remove the old path if needed + fs.rename(replacement, current); + if (fs.exists(old) && removeOld) { + fs.delete(old, true); + } + } + + /** + * Closes a group of SequenceFile readers. + * + * @param readers + * The SequenceFile readers to close. + * @throws IOException + * If an error occurs while closing a reader. + */ + public static void closeReaders(SequenceFile.Reader[] readers) + throws IOException { + + // loop through the readers, closing one by one + if (readers != null) { + for (int i = 0; i < readers.length; i++) { + SequenceFile.Reader reader = readers[i]; + if (reader != null) { + reader.close(); + } + } + } + } + + /** + * Closes a group of MapFile readers. + * + * @param readers + * The MapFile readers to close. + * @throws IOException + * If an error occurs while closing a reader. + */ + public static void closeReaders(MapFile.Reader[] readers) throws IOException { + + // loop through the readers closing one by one + if (readers != null) { + for (int i = 0; i < readers.length; i++) { + MapFile.Reader reader = readers[i]; + if (reader != null) { + reader.close(); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/util/GZIPUtils.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/util/GZIPUtils.java b/nutch-core/src/main/java/org/apache/nutch/util/GZIPUtils.java new file mode 100644 index 0000000..63b10e2 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/util/GZIPUtils.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.util; + +import java.io.ByteArrayOutputStream; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +// Slf4j Logging imports +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A collection of utility methods for working on GZIPed data. + */ +public class GZIPUtils { + + private static final Logger LOG = LoggerFactory.getLogger(GZIPUtils.class); + private static final int EXPECTED_COMPRESSION_RATIO = 5; + private static final int BUF_SIZE = 4096; + + /** + * Returns an gunzipped copy of the input array. If the gzipped input has been + * truncated or corrupted, a best-effort attempt is made to unzip as much as + * possible. If no data can be extracted <code>null</code> is returned. + */ + public static final byte[] unzipBestEffort(byte[] in) { + return unzipBestEffort(in, Integer.MAX_VALUE); + } + + /** + * Returns an gunzipped copy of the input array, truncated to + * <code>sizeLimit</code> bytes, if necessary. If the gzipped input has been + * truncated or corrupted, a best-effort attempt is made to unzip as much as + * possible. If no data can be extracted <code>null</code> is returned. + */ + public static final byte[] unzipBestEffort(byte[] in, int sizeLimit) { + try { + // decompress using GZIPInputStream + ByteArrayOutputStream outStream = new ByteArrayOutputStream( + EXPECTED_COMPRESSION_RATIO * in.length); + + GZIPInputStream inStream = new GZIPInputStream(new ByteArrayInputStream( + in)); + + byte[] buf = new byte[BUF_SIZE]; + int written = 0; + while (true) { + try { + int size = inStream.read(buf); + if (size <= 0) + break; + if ((written + size) > sizeLimit) { + outStream.write(buf, 0, sizeLimit - written); + break; + } + outStream.write(buf, 0, size); + written += size; + } catch (Exception e) { + break; + } + } + try { + outStream.close(); + } catch (IOException e) { + } + + return outStream.toByteArray(); + + } catch (IOException e) { + return null; + } + } + + /** + * Returns an gunzipped copy of the input array. + * + * @throws IOException + * if the input cannot be properly decompressed + */ + public static final byte[] unzip(byte[] in) throws IOException { + // decompress using GZIPInputStream + ByteArrayOutputStream outStream = new ByteArrayOutputStream( + EXPECTED_COMPRESSION_RATIO * in.length); + + GZIPInputStream inStream = new GZIPInputStream(new ByteArrayInputStream(in)); + + byte[] buf = new byte[BUF_SIZE]; + while (true) { + int size = inStream.read(buf); + if (size <= 0) + break; + outStream.write(buf, 0, size); + } + outStream.close(); + + return outStream.toByteArray(); + } + + /** + * Returns an gzipped copy of the input array. + */ + public static final byte[] zip(byte[] in) { + try { + // compress using GZIPOutputStream + ByteArrayOutputStream byteOut = new ByteArrayOutputStream(in.length + / EXPECTED_COMPRESSION_RATIO); + + GZIPOutputStream outStream = new GZIPOutputStream(byteOut); + + try { + outStream.write(in); + } catch (Exception e) { + LOG.error("Error writing outStream: ", e); + } + + try { + outStream.close(); + } catch (IOException e) { + LOG.error("Error closing outStream: ", e); + } + + return byteOut.toByteArray(); + + } catch (IOException e) { + LOG.error("Error: ", e); + return null; + } + } + +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/util/GenericWritableConfigurable.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/util/GenericWritableConfigurable.java b/nutch-core/src/main/java/org/apache/nutch/util/GenericWritableConfigurable.java new file mode 100644 index 0000000..755aad0 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/util/GenericWritableConfigurable.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.util; + +import java.io.DataInput; +import java.io.IOException; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.GenericWritable; +import org.apache.hadoop.io.Writable; + +/** + * A generic Writable wrapper that can inject Configuration to + * {@link Configurable}s + */ +public abstract class GenericWritableConfigurable extends GenericWritable + implements Configurable { + + private Configuration conf; + + public Configuration getConf() { + return conf; + } + + public void setConf(Configuration conf) { + this.conf = conf; + } + + @Override + public void readFields(DataInput in) throws IOException { + byte type = in.readByte(); + Class<?> clazz = getTypes()[type]; + try { + set((Writable) clazz.newInstance()); + } catch (Exception e) { + e.printStackTrace(); + throw new IOException("Cannot initialize the class: " + clazz); + } + Writable w = get(); + if (w instanceof Configurable) + ((Configurable) w).setConf(conf); + w.readFields(in); + } + +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/util/HadoopFSUtil.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/util/HadoopFSUtil.java b/nutch-core/src/main/java/org/apache/nutch/util/HadoopFSUtil.java new file mode 100644 index 0000000..6f471c1 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/util/HadoopFSUtil.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.util; + +import java.io.IOException; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; + +public class HadoopFSUtil { + + /** + * Returns PathFilter that passes all paths through. + */ + public static PathFilter getPassAllFilter() { + return new PathFilter() { + public boolean accept(Path arg0) { + return true; + } + }; + } + + /** + * Returns PathFilter that passes directories through. + */ + public static PathFilter getPassDirectoriesFilter(final FileSystem fs) { + return new PathFilter() { + public boolean accept(final Path path) { + try { + return fs.getFileStatus(path).isDirectory(); + } catch (IOException ioe) { + return false; + } + } + + }; + } + + /** + * Turns an array of FileStatus into an array of Paths. + */ + public static Path[] getPaths(FileStatus[] stats) { + if (stats == null) { + return null; + } + if (stats.length == 0) { + return new Path[0]; + } + Path[] res = new Path[stats.length]; + for (int i = 0; i < stats.length; i++) { + res[i] = stats[i].getPath(); + } + return res; + } + +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/util/JexlUtil.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/util/JexlUtil.java b/nutch-core/src/main/java/org/apache/nutch/util/JexlUtil.java new file mode 100644 index 0000000..656a458 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/util/JexlUtil.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.util; + +import java.util.Date; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.commons.jexl2.Expression; +import org.apache.commons.jexl2.JexlEngine; +import org.apache.commons.lang.time.DateUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A collection of Jexl utilit(y|ies). + */ +public class JexlUtil { + + public static final Logger LOG = LoggerFactory.getLogger(JexlUtil.class); + + /** + * + */ + public static Pattern datePattern = Pattern.compile("\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}Z"); + + /** + * Parses the given experssion to a Jexl expression. This supports + * date parsing. + * + * @param expr the Jexl expression + * @return parsed Jexl expression or null in case of parse error + */ + public static Expression parseExpression(String expr) { + if (expr == null) return null; + + try { + // Translate any date object into a long, dates must be specified as 20-03-2016T00:00:00Z + Matcher matcher = datePattern.matcher(expr); + if (matcher.find()) { + String date = matcher.group(); + + // Parse the thing and get epoch! + Date parsedDate = DateUtils.parseDateStrictly(date, new String[] {"yyyy-MM-dd'T'HH:mm:ss'Z'"}); + long time = parsedDate.getTime(); + + // Replace in the original expression + expr = expr.replace(date, Long.toString(time)); + } + + JexlEngine jexl = new JexlEngine(); + jexl.setSilent(true); + jexl.setStrict(true); + return jexl.createExpression(expr); + } catch (Exception e) { + LOG.error(e.getMessage()); + } + + return null; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/util/LockUtil.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/util/LockUtil.java b/nutch-core/src/main/java/org/apache/nutch/util/LockUtil.java new file mode 100644 index 0000000..7e3bb97 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/util/LockUtil.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.util; + +import java.io.IOException; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +/** + * Utility methods for handling application-level locking. + * + * @author Andrzej Bialecki + */ +public class LockUtil { + + /** + * Create a lock file. + * + * @param fs + * filesystem + * @param lockFile + * name of the lock file + * @param accept + * if true, and the target file exists, consider it valid. If false + * and the target file exists, throw an IOException. + * @throws IOException + * if accept is false, and the target file already exists, or if + * it's a directory. + */ + public static void createLockFile(FileSystem fs, Path lockFile, boolean accept) + throws IOException { + if (fs.exists(lockFile)) { + if (!accept) + throw new IOException("lock file " + lockFile + " already exists."); + if (fs.getFileStatus(lockFile).isDirectory()) + throw new IOException("lock file " + lockFile + + " already exists and is a directory."); + // do nothing - the file already exists. + } else { + // make sure parents exist + fs.mkdirs(lockFile.getParent()); + fs.createNewFile(lockFile); + } + } + + /** + * Remove lock file. NOTE: applications enforce the semantics of this file - + * this method simply removes any file with a given name. + * + * @param fs + * filesystem + * @param lockFile + * lock file name + * @return false, if the lock file doesn't exist. True, if it existed and was + * successfully removed. + * @throws IOException + * if lock file exists but it is a directory. + */ + public static boolean removeLockFile(FileSystem fs, Path lockFile) + throws IOException { + if (!fs.exists(lockFile)) + return false; + if (fs.getFileStatus(lockFile).isDirectory()) + throw new IOException("lock file " + lockFile + + " exists but is a directory!"); + return fs.delete(lockFile, false); + } +}
