http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/fetcher/Fetcher.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/fetcher/Fetcher.java b/nutch-core/src/main/java/org/apache/nutch/fetcher/Fetcher.java new file mode 100644 index 0000000..aad9ee9 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/fetcher/Fetcher.java @@ -0,0 +1,600 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.fetcher; + +import java.io.File; +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.io.*; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.conf.*; +import org.apache.hadoop.mapred.*; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.nutch.crawl.CrawlDatum; +import org.apache.nutch.crawl.NutchWritable; +import org.apache.nutch.metadata.Nutch; +import org.apache.nutch.protocol.*; +import org.apache.nutch.util.*; + +/** + * A queue-based fetcher. + * + * <p> + * This fetcher uses a well-known model of one producer (a QueueFeeder) and many + * consumers (FetcherThread-s). + * + * <p> + * QueueFeeder reads input fetchlists and populates a set of FetchItemQueue-s, + * which hold FetchItem-s that describe the items to be fetched. There are as + * many queues as there are unique hosts, but at any given time the total number + * of fetch items in all queues is less than a fixed number (currently set to a + * multiple of the number of threads). + * + * <p> + * As items are consumed from the queues, the QueueFeeder continues to add new + * input items, so that their total count stays fixed (FetcherThread-s may also + * add new items to the queues e.g. as a results of redirection) - until all + * input items are exhausted, at which point the number of items in the queues + * begins to decrease. When this number reaches 0 fetcher will finish. + * + * <p> + * This fetcher implementation handles per-host blocking itself, instead of + * delegating this work to protocol-specific plugins. Each per-host queue + * handles its own "politeness" settings, such as the maximum number of + * concurrent requests and crawl delay between consecutive requests - and also a + * list of requests in progress, and the time the last request was finished. As + * FetcherThread-s ask for new items to be fetched, queues may return eligible + * items or null if for "politeness" reasons this host's queue is not yet ready. + * + * <p> + * If there are still unfetched items in the queues, but none of the items are + * ready, FetcherThread-s will spin-wait until either some items become + * available, or a timeout is reached (at which point the Fetcher will abort, + * assuming the task is hung). + * + * @author Andrzej Bialecki + */ +public class Fetcher extends NutchTool implements Tool, +MapRunnable<Text, CrawlDatum, Text, NutchWritable> { + + public static final int PERM_REFRESH_TIME = 5; + + public static final String CONTENT_REDIR = "content"; + + public static final String PROTOCOL_REDIR = "protocol"; + + public static final Logger LOG = LoggerFactory.getLogger(Fetcher.class); + + public static class InputFormat extends + SequenceFileInputFormat<Text, CrawlDatum> { + /** Don't split inputs, to keep things polite. */ + public InputSplit[] getSplits(JobConf job, int nSplits) throws IOException { + FileStatus[] files = listStatus(job); + FileSplit[] splits = new FileSplit[files.length]; + for (int i = 0; i < files.length; i++) { + FileStatus cur = files[i]; + splits[i] = new FileSplit(cur.getPath(), 0, cur.getLen(), + (String[]) null); + } + return splits; + } + } + + @SuppressWarnings("unused") + private OutputCollector<Text, NutchWritable> output; + private Reporter reporter; + + private String segmentName; + private AtomicInteger activeThreads = new AtomicInteger(0); + private AtomicInteger spinWaiting = new AtomicInteger(0); + + private long start = System.currentTimeMillis(); // start time of fetcher run + private AtomicLong lastRequestStart = new AtomicLong(start); + + private AtomicLong bytes = new AtomicLong(0); // total bytes fetched + private AtomicInteger pages = new AtomicInteger(0); // total pages fetched + private AtomicInteger errors = new AtomicInteger(0); // total pages errored + + private boolean storingContent; + private boolean parsing; + FetchItemQueues fetchQueues; + QueueFeeder feeder; + + LinkedList<FetcherThread> fetcherThreads = new LinkedList<FetcherThread>(); + + public Fetcher() { + super(null); + } + + public Fetcher(Configuration conf) { + super(conf); + } + + private void reportStatus(int pagesLastSec, int bytesLastSec) + throws IOException { + StringBuilder status = new StringBuilder(); + Long elapsed = new Long((System.currentTimeMillis() - start) / 1000); + + float avgPagesSec = (float) pages.get() / elapsed.floatValue(); + long avgBytesSec = (bytes.get() / 128l) / elapsed.longValue(); + + status.append(activeThreads).append(" threads (").append(spinWaiting.get()) + .append(" waiting), "); + status.append(fetchQueues.getQueueCount()).append(" queues, "); + status.append(fetchQueues.getTotalSize()).append(" URLs queued, "); + status.append(pages).append(" pages, ").append(errors).append(" errors, "); + status.append(String.format("%.2f", avgPagesSec)).append(" pages/s ("); + status.append(pagesLastSec).append(" last sec), "); + status.append(avgBytesSec).append(" kbits/s (") + .append((bytesLastSec / 128)).append(" last sec)"); + + reporter.setStatus(status.toString()); + } + + public void configure(JobConf job) { + setConf(job); + + this.segmentName = job.get(Nutch.SEGMENT_NAME_KEY); + this.storingContent = isStoringContent(job); + this.parsing = isParsing(job); + + // if (job.getBoolean("fetcher.verbose", false)) { + // LOG.setLevel(Level.FINE); + // } + } + + public void close() { + } + + public static boolean isParsing(Configuration conf) { + return conf.getBoolean("fetcher.parse", true); + } + + public static boolean isStoringContent(Configuration conf) { + return conf.getBoolean("fetcher.store.content", true); + } + + public void run(RecordReader<Text, CrawlDatum> input, + OutputCollector<Text, NutchWritable> output, Reporter reporter) + throws IOException { + + this.output = output; + this.reporter = reporter; + this.fetchQueues = new FetchItemQueues(getConf()); + + int threadCount = getConf().getInt("fetcher.threads.fetch", 10); + if (LOG.isInfoEnabled()) { + LOG.info("Fetcher: threads: {}", threadCount); + } + + int timeoutDivisor = getConf().getInt("fetcher.threads.timeout.divisor", 2); + if (LOG.isInfoEnabled()) { + LOG.info("Fetcher: time-out divisor: {}", timeoutDivisor); + } + + int queueDepthMuliplier = getConf().getInt( + "fetcher.queue.depth.multiplier", 50); + + feeder = new QueueFeeder(input, fetchQueues, threadCount + * queueDepthMuliplier); + // feeder.setPriority((Thread.MAX_PRIORITY + Thread.NORM_PRIORITY) / 2); + + // the value of the time limit is either -1 or the time where it should + // finish + long timelimit = getConf().getLong("fetcher.timelimit", -1); + if (timelimit != -1) + feeder.setTimeLimit(timelimit); + feeder.start(); + + // set non-blocking & no-robots mode for HTTP protocol plugins. + getConf().setBoolean(Protocol.CHECK_BLOCKING, false); + getConf().setBoolean(Protocol.CHECK_ROBOTS, false); + + for (int i = 0; i < threadCount; i++) { // spawn threads + FetcherThread t = new FetcherThread(getConf(), getActiveThreads(), fetchQueues, + feeder, spinWaiting, lastRequestStart, reporter, errors, segmentName, + parsing, output, storingContent, pages, bytes); + fetcherThreads.add(t); + t.start(); + } + + // select a timeout that avoids a task timeout + long timeout = getConf().getInt("mapred.task.timeout", 10 * 60 * 1000) + / timeoutDivisor; + + // Used for threshold check, holds pages and bytes processed in the last + // second + int pagesLastSec; + int bytesLastSec; + + int throughputThresholdNumRetries = 0; + + int throughputThresholdPages = getConf().getInt( + "fetcher.throughput.threshold.pages", -1); + if (LOG.isInfoEnabled()) { + LOG.info("Fetcher: throughput threshold: {}", throughputThresholdPages); + } + int throughputThresholdMaxRetries = getConf().getInt( + "fetcher.throughput.threshold.retries", 5); + if (LOG.isInfoEnabled()) { + LOG.info("Fetcher: throughput threshold retries: {}", + throughputThresholdMaxRetries); + } + long throughputThresholdTimeLimit = getConf().getLong( + "fetcher.throughput.threshold.check.after", -1); + + int targetBandwidth = getConf().getInt("fetcher.bandwidth.target", -1) * 1000; + int maxNumThreads = getConf().getInt("fetcher.maxNum.threads", threadCount); + if (maxNumThreads < threadCount) { + LOG.info("fetcher.maxNum.threads can't be < than {} : using {} instead", + threadCount, threadCount); + maxNumThreads = threadCount; + } + int bandwidthTargetCheckEveryNSecs = getConf().getInt( + "fetcher.bandwidth.target.check.everyNSecs", 30); + if (bandwidthTargetCheckEveryNSecs < 1) { + LOG.info("fetcher.bandwidth.target.check.everyNSecs can't be < to 1 : using 1 instead"); + bandwidthTargetCheckEveryNSecs = 1; + } + + int maxThreadsPerQueue = getConf().getInt("fetcher.threads.per.queue", 1); + + int bandwidthTargetCheckCounter = 0; + long bytesAtLastBWTCheck = 0l; + + do { // wait for threads to exit + pagesLastSec = pages.get(); + bytesLastSec = (int) bytes.get(); + + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + } + + pagesLastSec = pages.get() - pagesLastSec; + bytesLastSec = (int) bytes.get() - bytesLastSec; + + reporter.incrCounter("FetcherStatus", "bytes_downloaded", bytesLastSec); + + reportStatus(pagesLastSec, bytesLastSec); + + LOG.info("-activeThreads=" + activeThreads + ", spinWaiting=" + + spinWaiting.get() + ", fetchQueues.totalSize=" + + fetchQueues.getTotalSize() + ", fetchQueues.getQueueCount=" + + fetchQueues.getQueueCount()); + + if (!feeder.isAlive() && fetchQueues.getTotalSize() < 5) { + fetchQueues.dump(); + } + + // if throughput threshold is enabled + if (throughputThresholdTimeLimit < System.currentTimeMillis() + && throughputThresholdPages != -1) { + // Check if we're dropping below the threshold + if (pagesLastSec < throughputThresholdPages) { + throughputThresholdNumRetries++; + LOG.warn("{}: dropping below configured threshold of {} pages per second", + Integer.toString(throughputThresholdNumRetries), Integer.toString(throughputThresholdPages)); + + // Quit if we dropped below threshold too many times + if (throughputThresholdNumRetries == throughputThresholdMaxRetries) { + LOG.warn("Dropped below threshold too many times, killing!"); + + // Disable the threshold checker + throughputThresholdPages = -1; + + // Empty the queues cleanly and get number of items that were + // dropped + int hitByThrougputThreshold = fetchQueues.emptyQueues(); + + if (hitByThrougputThreshold != 0) + reporter.incrCounter("FetcherStatus", "hitByThrougputThreshold", + hitByThrougputThreshold); + } + } + } + + // adjust the number of threads if a target bandwidth has been set + if (targetBandwidth > 0) { + if (bandwidthTargetCheckCounter < bandwidthTargetCheckEveryNSecs) + bandwidthTargetCheckCounter++; + else if (bandwidthTargetCheckCounter == bandwidthTargetCheckEveryNSecs) { + long bpsSinceLastCheck = ((bytes.get() - bytesAtLastBWTCheck) * 8) + / bandwidthTargetCheckEveryNSecs; + + bytesAtLastBWTCheck = bytes.get(); + bandwidthTargetCheckCounter = 0; + + int averageBdwPerThread = 0; + if (activeThreads.get() > 0) + averageBdwPerThread = Math.round(bpsSinceLastCheck + / activeThreads.get()); + + LOG.info("averageBdwPerThread : {} kbps", (averageBdwPerThread / 1000)); + + if (bpsSinceLastCheck < targetBandwidth && averageBdwPerThread > 0) { + // check whether it is worth doing e.g. more queues than threads + + if ((fetchQueues.getQueueCount() * maxThreadsPerQueue) > activeThreads + .get()) { + + long remainingBdw = targetBandwidth - bpsSinceLastCheck; + int additionalThreads = Math.round(remainingBdw + / averageBdwPerThread); + int availableThreads = maxNumThreads - activeThreads.get(); + + // determine the number of available threads (min between + // availableThreads and additionalThreads) + additionalThreads = (availableThreads < additionalThreads ? availableThreads + : additionalThreads); + LOG.info("Has space for more threads ({} vs {} kbps) \t=> adding {} new threads", + (bpsSinceLastCheck / 1000), (targetBandwidth / 1000), additionalThreads); + // activate new threads + for (int i = 0; i < additionalThreads; i++) { + FetcherThread thread = new FetcherThread(getConf(), getActiveThreads(), fetchQueues, + feeder, spinWaiting, lastRequestStart, reporter, errors, segmentName, parsing, + output, storingContent, pages, bytes); + fetcherThreads.add(thread); + thread.start(); + } + } + } else if (bpsSinceLastCheck > targetBandwidth + && averageBdwPerThread > 0) { + // if the bandwidth we're using is greater then the expected + // bandwidth, we have to stop some threads + long excessBdw = bpsSinceLastCheck - targetBandwidth; + int excessThreads = Math.round(excessBdw / averageBdwPerThread); + LOG.info("Exceeding target bandwidth ({} vs {} kbps). \t=> excessThreads = {}", + bpsSinceLastCheck / 1000, (targetBandwidth / 1000), excessThreads); + // keep at least one + if (excessThreads >= fetcherThreads.size()) + excessThreads = 0; + // de-activates threads + for (int i = 0; i < excessThreads; i++) { + FetcherThread thread = fetcherThreads.removeLast(); + thread.setHalted(true); + } + } + } + } + + // check timelimit + if (!feeder.isAlive()) { + int hitByTimeLimit = fetchQueues.checkTimelimit(); + if (hitByTimeLimit != 0) + reporter.incrCounter("FetcherStatus", "hitByTimeLimit", + hitByTimeLimit); + } + + // some requests seem to hang, despite all intentions + if ((System.currentTimeMillis() - lastRequestStart.get()) > timeout) { + if (LOG.isWarnEnabled()) { + LOG.warn("Aborting with {} hung threads.", activeThreads); + for (int i = 0; i < fetcherThreads.size(); i++) { + FetcherThread thread = fetcherThreads.get(i); + if (thread.isAlive()) { + LOG.warn("Thread #{} hung while processing {}", i, thread.getReprUrl()); + if (LOG.isDebugEnabled()) { + StackTraceElement[] stack = thread.getStackTrace(); + StringBuilder sb = new StringBuilder(); + sb.append("Stack of thread #").append(i).append(":\n"); + for (StackTraceElement s : stack) { + sb.append(s.toString()).append('\n'); + } + LOG.debug(sb.toString()); + } + } + } + } + return; + } + + } while (activeThreads.get() > 0); + LOG.info("-activeThreads={}", activeThreads); + + } + + public void fetch(Path segment, int threads) throws IOException { + + checkConfiguration(); + + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + long start = System.currentTimeMillis(); + if (LOG.isInfoEnabled()) { + LOG.info("Fetcher: starting at {}", sdf.format(start)); + LOG.info("Fetcher: segment: {}", segment); + } + + // set the actual time for the timelimit relative + // to the beginning of the whole job and not of a specific task + // otherwise it keeps trying again if a task fails + long timelimit = getConf().getLong("fetcher.timelimit.mins", -1); + if (timelimit != -1) { + timelimit = System.currentTimeMillis() + (timelimit * 60 * 1000); + LOG.info("Fetcher Timelimit set for : {}", timelimit); + getConf().setLong("fetcher.timelimit", timelimit); + } + + // Set the time limit after which the throughput threshold feature is + // enabled + timelimit = getConf().getLong("fetcher.throughput.threshold.check.after", + 10); + timelimit = System.currentTimeMillis() + (timelimit * 60 * 1000); + getConf().setLong("fetcher.throughput.threshold.check.after", timelimit); + + int maxOutlinkDepth = getConf().getInt("fetcher.follow.outlinks.depth", -1); + if (maxOutlinkDepth > 0) { + LOG.info("Fetcher: following outlinks up to depth: {}", + Integer.toString(maxOutlinkDepth)); + + int maxOutlinkDepthNumLinks = getConf().getInt( + "fetcher.follow.outlinks.num.links", 4); + int outlinksDepthDivisor = getConf().getInt( + "fetcher.follow.outlinks.depth.divisor", 2); + + int totalOutlinksToFollow = 0; + for (int i = 0; i < maxOutlinkDepth; i++) { + totalOutlinksToFollow += (int) Math.floor(outlinksDepthDivisor + / (i + 1) * maxOutlinkDepthNumLinks); + } + + LOG.info("Fetcher: maximum outlinks to follow: {}", + Integer.toString(totalOutlinksToFollow)); + } + + JobConf job = new NutchJob(getConf()); + job.setJobName("fetch " + segment); + + job.setInt("fetcher.threads.fetch", threads); + job.set(Nutch.SEGMENT_NAME_KEY, segment.getName()); + + // for politeness, don't permit parallel execution of a single task + job.setSpeculativeExecution(false); + + FileInputFormat.addInputPath(job, new Path(segment, + CrawlDatum.GENERATE_DIR_NAME)); + job.setInputFormat(InputFormat.class); + + job.setMapRunnerClass(Fetcher.class); + + FileOutputFormat.setOutputPath(job, segment); + job.setOutputFormat(FetcherOutputFormat.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(NutchWritable.class); + + JobClient.runJob(job); + + long end = System.currentTimeMillis(); + LOG.info("Fetcher: finished at {}, elapsed: {}", sdf.format(end), + TimingUtil.elapsedTime(start, end)); + } + + /** Run the fetcher. */ + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(NutchConfiguration.create(), new Fetcher(), args); + System.exit(res); + } + + public int run(String[] args) throws Exception { + + String usage = "Usage: Fetcher <segment> [-threads n]"; + + if (args.length < 1) { + System.err.println(usage); + return -1; + } + + Path segment = new Path(args[0]); + + int threads = getConf().getInt("fetcher.threads.fetch", 10); + + for (int i = 1; i < args.length; i++) { // parse command line + if (args[i].equals("-threads")) { // found -threads option + threads = Integer.parseInt(args[++i]); + } + } + + getConf().setInt("fetcher.threads.fetch", threads); + + try { + fetch(segment, threads); + return 0; + } catch (Exception e) { + LOG.error("Fetcher: {}", StringUtils.stringifyException(e)); + return -1; + } + + } + + private void checkConfiguration() { + // ensure that a value has been set for the agent name + String agentName = getConf().get("http.agent.name"); + if (agentName == null || agentName.trim().length() == 0) { + String message = "Fetcher: No agents listed in 'http.agent.name'" + + " property."; + if (LOG.isErrorEnabled()) { + LOG.error(message); + } + throw new IllegalArgumentException(message); + } + } + + private AtomicInteger getActiveThreads() { + return activeThreads; + } + + @Override + public Map<String, Object> run(Map<String, Object> args, String crawlId) throws Exception { + + Map<String, Object> results = new HashMap<String, Object>(); + + Path segment; + if(args.containsKey(Nutch.ARG_SEGMENT)) { + Object seg = args.get(Nutch.ARG_SEGMENT); + if(seg instanceof Path) { + segment = (Path) seg; + } + else { + segment = new Path(seg.toString()); + } + } + else { + String segment_dir = crawlId+"/segments"; + File segmentsDir = new File(segment_dir); + File[] segmentsList = segmentsDir.listFiles(); + Arrays.sort(segmentsList, new Comparator<File>(){ + @Override + public int compare(File f1, File f2) { + if(f1.lastModified()>f2.lastModified()) + return -1; + else + return 0; + } + }); + segment = new Path(segmentsList[0].getPath()); + } + + + int threads = getConf().getInt("fetcher.threads.fetch", 10); + + // parse command line + if (args.containsKey("threads")) { // found -threads option + threads = Integer.parseInt((String)args.get("threads")); + } + getConf().setInt("fetcher.threads.fetch", threads); + + try { + fetch(segment, threads); + results.put(Nutch.VAL_RESULT, Integer.toString(0)); + return results; + } catch (Exception e) { + LOG.error("Fetcher: {}", StringUtils.stringifyException(e)); + results.put(Nutch.VAL_RESULT, Integer.toString(-1)); + return results; + } + } + +}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/fetcher/FetcherOutputFormat.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/fetcher/FetcherOutputFormat.java b/nutch-core/src/main/java/org/apache/nutch/fetcher/FetcherOutputFormat.java new file mode 100644 index 0000000..d526a07 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/fetcher/FetcherOutputFormat.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nutch.fetcher; + +import java.io.IOException; + +import org.apache.nutch.crawl.CrawlDatum; +import org.apache.nutch.crawl.NutchWritable; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.MapFile; +import org.apache.hadoop.io.MapFile.Writer.Option; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.InvalidJobConfException; +import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.hadoop.util.Progressable; +import org.apache.nutch.parse.Parse; +import org.apache.nutch.parse.ParseOutputFormat; +import org.apache.nutch.protocol.Content; + +/** Splits FetcherOutput entries into multiple map files. */ +public class FetcherOutputFormat implements OutputFormat<Text, NutchWritable> { + + public void checkOutputSpecs(FileSystem fs, JobConf job) throws IOException { + Path out = FileOutputFormat.getOutputPath(job); + if ((out == null) && (job.getNumReduceTasks() != 0)) { + throw new InvalidJobConfException("Output directory not set in JobConf."); + } + if (fs == null) { + fs = out.getFileSystem(job); + } + if (fs.exists(new Path(out, CrawlDatum.FETCH_DIR_NAME))) + throw new IOException("Segment already fetched!"); + } + + public RecordWriter<Text, NutchWritable> getRecordWriter(final FileSystem fs, + final JobConf job, final String name, final Progressable progress) + throws IOException { + + Path out = FileOutputFormat.getOutputPath(job); + final Path fetch = new Path(new Path(out, CrawlDatum.FETCH_DIR_NAME), name); + final Path content = new Path(new Path(out, Content.DIR_NAME), name); + + final CompressionType compType = SequenceFileOutputFormat + .getOutputCompressionType(job); + + Option fKeyClassOpt = MapFile.Writer.keyClass(Text.class); + org.apache.hadoop.io.SequenceFile.Writer.Option fValClassOpt = SequenceFile.Writer.valueClass(CrawlDatum.class); + org.apache.hadoop.io.SequenceFile.Writer.Option fProgressOpt = SequenceFile.Writer.progressable(progress); + org.apache.hadoop.io.SequenceFile.Writer.Option fCompOpt = SequenceFile.Writer.compression(compType); + + final MapFile.Writer fetchOut = new MapFile.Writer(job, + fetch, fKeyClassOpt, fValClassOpt, fCompOpt, fProgressOpt); + + return new RecordWriter<Text, NutchWritable>() { + private MapFile.Writer contentOut; + private RecordWriter<Text, Parse> parseOut; + + { + if (Fetcher.isStoringContent(job)) { + Option cKeyClassOpt = MapFile.Writer.keyClass(Text.class); + org.apache.hadoop.io.SequenceFile.Writer.Option cValClassOpt = SequenceFile.Writer.valueClass(Content.class); + org.apache.hadoop.io.SequenceFile.Writer.Option cProgressOpt = SequenceFile.Writer.progressable(progress); + org.apache.hadoop.io.SequenceFile.Writer.Option cCompOpt = SequenceFile.Writer.compression(compType); + contentOut = new MapFile.Writer(job, content, + cKeyClassOpt, cValClassOpt, cCompOpt, cProgressOpt); + } + + if (Fetcher.isParsing(job)) { + parseOut = new ParseOutputFormat().getRecordWriter(fs, job, name, + progress); + } + } + + public void write(Text key, NutchWritable value) throws IOException { + + Writable w = value.get(); + + if (w instanceof CrawlDatum) + fetchOut.append(key, w); + else if (w instanceof Content && contentOut != null) + contentOut.append(key, w); + else if (w instanceof Parse && parseOut != null) + parseOut.write(key, (Parse) w); + } + + public void close(Reporter reporter) throws IOException { + fetchOut.close(); + if (contentOut != null) { + contentOut.close(); + } + if (parseOut != null) { + parseOut.close(reporter); + } + } + + }; + + } +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/fetcher/FetcherThread.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/fetcher/FetcherThread.java b/nutch-core/src/main/java/org/apache/nutch/fetcher/FetcherThread.java new file mode 100644 index 0000000..e57e735 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/fetcher/FetcherThread.java @@ -0,0 +1,768 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.fetcher; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.StringUtils; +import org.apache.nutch.crawl.CrawlDatum; +import org.apache.nutch.crawl.NutchWritable; +import org.apache.nutch.crawl.SignatureFactory; +import org.apache.nutch.metadata.Metadata; +import org.apache.nutch.metadata.Nutch; +import org.apache.nutch.net.URLExemptionFilters; +import org.apache.nutch.net.URLFilterException; +import org.apache.nutch.net.URLFilters; +import org.apache.nutch.net.URLNormalizers; +import org.apache.nutch.parse.Outlink; +import org.apache.nutch.parse.Parse; +import org.apache.nutch.parse.ParseData; +import org.apache.nutch.parse.ParseImpl; +import org.apache.nutch.parse.ParseOutputFormat; +import org.apache.nutch.parse.ParseResult; +import org.apache.nutch.parse.ParseSegment; +import org.apache.nutch.parse.ParseStatus; +import org.apache.nutch.parse.ParseText; +import org.apache.nutch.parse.ParseUtil; +import org.apache.nutch.protocol.Content; +import org.apache.nutch.protocol.Protocol; +import org.apache.nutch.protocol.ProtocolFactory; +import org.apache.nutch.protocol.ProtocolOutput; +import org.apache.nutch.protocol.ProtocolStatus; +import org.apache.nutch.scoring.ScoringFilterException; +import org.apache.nutch.scoring.ScoringFilters; +import org.apache.nutch.service.NutchServer; +import org.apache.nutch.util.StringUtil; +import org.apache.nutch.util.URLUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import crawlercommons.robots.BaseRobotRules; + +/** + * This class picks items from queues and fetches the pages. + */ +public class FetcherThread extends Thread { + + private static final Logger LOG = LoggerFactory.getLogger(FetcherThread.class); + + private Configuration conf; + private URLFilters urlFilters; + private URLExemptionFilters urlExemptionFilters; + private ScoringFilters scfilters; + private ParseUtil parseUtil; + private URLNormalizers normalizers; + private ProtocolFactory protocolFactory; + private long maxCrawlDelay; + private String queueMode; + private int maxRedirect; + private String reprUrl; + private boolean redirecting; + private int redirectCount; + private boolean ignoreInternalLinks; + private boolean ignoreExternalLinks; + private String ignoreExternalLinksMode; + + // Used by fetcher.follow.outlinks.depth in parse + private int maxOutlinksPerPage; + private final int maxOutlinks; + private final int interval; + private int maxOutlinkDepth; + private int maxOutlinkDepthNumLinks; + private boolean outlinksIgnoreExternal; + + private int outlinksDepthDivisor; + private boolean skipTruncated; + + private boolean halted = false; + + private AtomicInteger activeThreads; + + private Object fetchQueues; + + private QueueFeeder feeder; + + private Object spinWaiting; + + private AtomicLong lastRequestStart; + + private Reporter reporter; + + private AtomicInteger errors; + + private String segmentName; + + private boolean parsing; + + private OutputCollector<Text, NutchWritable> output; + + private boolean storingContent; + + private AtomicInteger pages; + + private AtomicLong bytes; + + //Used by the REST service + private FetchNode fetchNode; + private boolean reportToNutchServer; + + public FetcherThread(Configuration conf, AtomicInteger activeThreads, FetchItemQueues fetchQueues, + QueueFeeder feeder, AtomicInteger spinWaiting, AtomicLong lastRequestStart, Reporter reporter, + AtomicInteger errors, String segmentName, boolean parsing, OutputCollector<Text, NutchWritable> output, + boolean storingContent, AtomicInteger pages, AtomicLong bytes) { + this.setDaemon(true); // don't hang JVM on exit + this.setName("FetcherThread"); // use an informative name + this.conf = conf; + this.urlFilters = new URLFilters(conf); + this.urlExemptionFilters = new URLExemptionFilters(conf); + this.scfilters = new ScoringFilters(conf); + this.parseUtil = new ParseUtil(conf); + this.skipTruncated = conf.getBoolean(ParseSegment.SKIP_TRUNCATED, true); + this.protocolFactory = new ProtocolFactory(conf); + this.normalizers = new URLNormalizers(conf, URLNormalizers.SCOPE_FETCHER); + this.maxCrawlDelay = conf.getInt("fetcher.max.crawl.delay", 30) * 1000; + this.activeThreads = activeThreads; + this.fetchQueues = fetchQueues; + this.feeder = feeder; + this.spinWaiting = spinWaiting; + this.lastRequestStart = lastRequestStart; + this.reporter = reporter; + this.errors = errors; + this.segmentName = segmentName; + this.parsing = parsing; + this.output = output; + this.storingContent = storingContent; + this.pages = pages; + this.bytes = bytes; + queueMode = conf.get("fetcher.queue.mode", + FetchItemQueues.QUEUE_MODE_HOST); + // check that the mode is known + if (!queueMode.equals(FetchItemQueues.QUEUE_MODE_IP) + && !queueMode.equals(FetchItemQueues.QUEUE_MODE_DOMAIN) + && !queueMode.equals(FetchItemQueues.QUEUE_MODE_HOST)) { + LOG.error("Unknown partition mode : " + queueMode + + " - forcing to byHost"); + queueMode = FetchItemQueues.QUEUE_MODE_HOST; + } + LOG.info("Using queue mode : " + queueMode); + this.maxRedirect = conf.getInt("http.redirect.max", 3); + + maxOutlinksPerPage = conf.getInt("db.max.outlinks.per.page", 100); + maxOutlinks = (maxOutlinksPerPage < 0) ? Integer.MAX_VALUE + : maxOutlinksPerPage; + interval = conf.getInt("db.fetch.interval.default", 2592000); + ignoreInternalLinks = conf.getBoolean("db.ignore.internal.links", false); + ignoreExternalLinks = conf.getBoolean("db.ignore.external.links", false); + ignoreExternalLinksMode = conf.get("db.ignore.external.links.mode", "byHost"); + maxOutlinkDepth = conf.getInt("fetcher.follow.outlinks.depth", -1); + outlinksIgnoreExternal = conf.getBoolean( + "fetcher.follow.outlinks.ignore.external", false); + maxOutlinkDepthNumLinks = conf.getInt( + "fetcher.follow.outlinks.num.links", 4); + outlinksDepthDivisor = conf.getInt( + "fetcher.follow.outlinks.depth.divisor", 2); + } + + @SuppressWarnings("fallthrough") + public void run() { + activeThreads.incrementAndGet(); // count threads + + FetchItem fit = null; + try { + // checking for the server to be running and fetcher.parse to be true + if (parsing && NutchServer.getInstance().isRunning()) + reportToNutchServer = true; + + while (true) { + // creating FetchNode for storing in FetchNodeDb + if (reportToNutchServer) + this.fetchNode = new FetchNode(); + else + this.fetchNode = null; + + // check whether must be stopped + if (isHalted()) { + LOG.debug(getName() + " set to halted"); + fit = null; + return; + } + + fit = ((FetchItemQueues) fetchQueues).getFetchItem(); + if (fit == null) { + if (feeder.isAlive() || ((FetchItemQueues) fetchQueues).getTotalSize() > 0) { + LOG.debug(getName() + " spin-waiting ..."); + // spin-wait. + ((AtomicInteger) spinWaiting).incrementAndGet(); + try { + Thread.sleep(500); + } catch (Exception e) { + } + ((AtomicInteger) spinWaiting).decrementAndGet(); + continue; + } else { + // all done, finish this thread + LOG.info("Thread " + getName() + " has no more work available"); + return; + } + } + lastRequestStart.set(System.currentTimeMillis()); + Text reprUrlWritable = (Text) fit.datum.getMetaData().get( + Nutch.WRITABLE_REPR_URL_KEY); + if (reprUrlWritable == null) { + setReprUrl(fit.url.toString()); + } else { + setReprUrl(reprUrlWritable.toString()); + } + try { + // fetch the page + redirecting = false; + redirectCount = 0; + do { + if (LOG.isInfoEnabled()) { + LOG.info("fetching " + fit.url + " (queue crawl delay=" + + ((FetchItemQueues) fetchQueues).getFetchItemQueue(fit.queueID).crawlDelay + + "ms)"); + } + if (LOG.isDebugEnabled()) { + LOG.debug("redirectCount=" + redirectCount); + } + redirecting = false; + Protocol protocol = this.protocolFactory.getProtocol(fit.url + .toString()); + BaseRobotRules rules = protocol.getRobotRules(fit.url, fit.datum); + if (!rules.isAllowed(fit.u.toString())) { + // unblock + ((FetchItemQueues) fetchQueues).finishFetchItem(fit, true); + if (LOG.isDebugEnabled()) { + LOG.debug("Denied by robots.txt: " + fit.url); + } + output(fit.url, fit.datum, null, + ProtocolStatus.STATUS_ROBOTS_DENIED, + CrawlDatum.STATUS_FETCH_GONE); + reporter.incrCounter("FetcherStatus", "robots_denied", 1); + continue; + } + if (rules.getCrawlDelay() > 0) { + if (rules.getCrawlDelay() > maxCrawlDelay && maxCrawlDelay >= 0) { + // unblock + ((FetchItemQueues) fetchQueues).finishFetchItem(fit, true); + LOG.debug("Crawl-Delay for " + fit.url + " too long (" + + rules.getCrawlDelay() + "), skipping"); + output(fit.url, fit.datum, null, + ProtocolStatus.STATUS_ROBOTS_DENIED, + CrawlDatum.STATUS_FETCH_GONE); + reporter.incrCounter("FetcherStatus", + "robots_denied_maxcrawldelay", 1); + continue; + } else { + FetchItemQueue fiq = ((FetchItemQueues) fetchQueues) + .getFetchItemQueue(fit.queueID); + fiq.crawlDelay = rules.getCrawlDelay(); + if (LOG.isDebugEnabled()) { + LOG.debug("Crawl delay for queue: " + fit.queueID + + " is set to " + fiq.crawlDelay + + " as per robots.txt. url: " + fit.url); + } + } + } + ProtocolOutput output = protocol.getProtocolOutput(fit.url, + fit.datum); + ProtocolStatus status = output.getStatus(); + Content content = output.getContent(); + ParseStatus pstatus = null; + // unblock queue + ((FetchItemQueues) fetchQueues).finishFetchItem(fit); + + String urlString = fit.url.toString(); + + // used for FetchNode + if (fetchNode != null) { + fetchNode.setStatus(status.getCode()); + fetchNode.setFetchTime(System.currentTimeMillis()); + fetchNode.setUrl(fit.url); + } + + reporter.incrCounter("FetcherStatus", status.getName(), 1); + + switch (status.getCode()) { + + case ProtocolStatus.WOULDBLOCK: + // retry ? + ((FetchItemQueues) fetchQueues).addFetchItem(fit); + break; + + case ProtocolStatus.SUCCESS: // got a page + pstatus = output(fit.url, fit.datum, content, status, + CrawlDatum.STATUS_FETCH_SUCCESS, fit.outlinkDepth); + updateStatus(content.getContent().length); + if (pstatus != null && pstatus.isSuccess() + && pstatus.getMinorCode() == ParseStatus.SUCCESS_REDIRECT) { + String newUrl = pstatus.getMessage(); + int refreshTime = Integer.valueOf(pstatus.getArgs()[1]); + Text redirUrl = handleRedirect(fit.url, fit.datum, urlString, + newUrl, refreshTime < Fetcher.PERM_REFRESH_TIME, + Fetcher.CONTENT_REDIR); + if (redirUrl != null) { + fit = queueRedirect(redirUrl, fit); + } + } + break; + + case ProtocolStatus.MOVED: // redirect + case ProtocolStatus.TEMP_MOVED: + int code; + boolean temp; + if (status.getCode() == ProtocolStatus.MOVED) { + code = CrawlDatum.STATUS_FETCH_REDIR_PERM; + temp = false; + } else { + code = CrawlDatum.STATUS_FETCH_REDIR_TEMP; + temp = true; + } + output(fit.url, fit.datum, content, status, code); + String newUrl = status.getMessage(); + Text redirUrl = handleRedirect(fit.url, fit.datum, urlString, + newUrl, temp, Fetcher.PROTOCOL_REDIR); + if (redirUrl != null) { + fit = queueRedirect(redirUrl, fit); + } else { + // stop redirecting + redirecting = false; + } + break; + + case ProtocolStatus.EXCEPTION: + logError(fit.url, status.getMessage()); + int killedURLs = ((FetchItemQueues) fetchQueues).checkExceptionThreshold(fit + .getQueueID()); + if (killedURLs != 0) + reporter.incrCounter("FetcherStatus", + "AboveExceptionThresholdInQueue", killedURLs); + /* FALLTHROUGH */ + case ProtocolStatus.RETRY: // retry + case ProtocolStatus.BLOCKED: + output(fit.url, fit.datum, null, status, + CrawlDatum.STATUS_FETCH_RETRY); + break; + + case ProtocolStatus.GONE: // gone + case ProtocolStatus.NOTFOUND: + case ProtocolStatus.ACCESS_DENIED: + case ProtocolStatus.ROBOTS_DENIED: + output(fit.url, fit.datum, null, status, + CrawlDatum.STATUS_FETCH_GONE); + break; + + case ProtocolStatus.NOTMODIFIED: + output(fit.url, fit.datum, null, status, + CrawlDatum.STATUS_FETCH_NOTMODIFIED); + break; + + default: + if (LOG.isWarnEnabled()) { + LOG.warn("Unknown ProtocolStatus: " + status.getCode()); + } + output(fit.url, fit.datum, null, status, + CrawlDatum.STATUS_FETCH_RETRY); + } + + if (redirecting && redirectCount > maxRedirect) { + ((FetchItemQueues) fetchQueues).finishFetchItem(fit); + if (LOG.isInfoEnabled()) { + LOG.info(" - redirect count exceeded " + fit.url); + } + output(fit.url, fit.datum, null, + ProtocolStatus.STATUS_REDIR_EXCEEDED, + CrawlDatum.STATUS_FETCH_GONE); + } + + } while (redirecting && (redirectCount <= maxRedirect)); + + } catch (Throwable t) { // unexpected exception + // unblock + ((FetchItemQueues) fetchQueues).finishFetchItem(fit); + logError(fit.url, StringUtils.stringifyException(t)); + output(fit.url, fit.datum, null, ProtocolStatus.STATUS_FAILED, + CrawlDatum.STATUS_FETCH_RETRY); + } + } + + } catch (Throwable e) { + if (LOG.isErrorEnabled()) { + LOG.error("fetcher caught:" + e.toString()); + } + } finally { + if (fit != null) + ((FetchItemQueues) fetchQueues).finishFetchItem(fit); + activeThreads.decrementAndGet(); // count threads + LOG.info("-finishing thread " + getName() + ", activeThreads=" + + activeThreads); + } + } + + private Text handleRedirect(Text url, CrawlDatum datum, String urlString, + String newUrl, boolean temp, String redirType) + throws MalformedURLException, URLFilterException { + newUrl = normalizers.normalize(newUrl, URLNormalizers.SCOPE_FETCHER); + newUrl = urlFilters.filter(newUrl); + + try { + String origHost = new URL(urlString).getHost().toLowerCase(); + String newHost = new URL(newUrl).getHost().toLowerCase(); + if (ignoreExternalLinks) { + if (!origHost.equals(newHost)) { + if (LOG.isDebugEnabled()) { + LOG.debug(" - ignoring redirect " + redirType + " from " + + urlString + " to " + newUrl + + " because external links are ignored"); + } + return null; + } + } + + if (ignoreInternalLinks) { + if (origHost.equals(newHost)) { + if (LOG.isDebugEnabled()) { + LOG.debug(" - ignoring redirect " + redirType + " from " + + urlString + " to " + newUrl + + " because internal links are ignored"); + } + return null; + } + } + } catch (MalformedURLException e) { } + + if (newUrl != null && !newUrl.equals(urlString)) { + reprUrl = URLUtil.chooseRepr(reprUrl, newUrl, temp); + url = new Text(newUrl); + if (maxRedirect > 0) { + redirecting = true; + redirectCount++; + if (LOG.isDebugEnabled()) { + LOG.debug(" - " + redirType + " redirect to " + url + + " (fetching now)"); + } + return url; + } else { + CrawlDatum newDatum = new CrawlDatum(CrawlDatum.STATUS_LINKED, + datum.getFetchInterval(), datum.getScore()); + // transfer existing metadata + newDatum.getMetaData().putAll(datum.getMetaData()); + try { + scfilters.initialScore(url, newDatum); + } catch (ScoringFilterException e) { + e.printStackTrace(); + } + if (reprUrl != null) { + newDatum.getMetaData().put(Nutch.WRITABLE_REPR_URL_KEY, + new Text(reprUrl)); + } + output(url, newDatum, null, null, CrawlDatum.STATUS_LINKED); + if (LOG.isDebugEnabled()) { + LOG.debug(" - " + redirType + " redirect to " + url + + " (fetching later)"); + } + return null; + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug(" - " + redirType + " redirect skipped: " + + (newUrl != null ? "to same url" : "filtered")); + } + return null; + } + } + + private FetchItem queueRedirect(Text redirUrl, FetchItem fit) + throws ScoringFilterException { + CrawlDatum newDatum = new CrawlDatum(CrawlDatum.STATUS_DB_UNFETCHED, + fit.datum.getFetchInterval(), fit.datum.getScore()); + // transfer all existing metadata to the redirect + newDatum.getMetaData().putAll(fit.datum.getMetaData()); + scfilters.initialScore(redirUrl, newDatum); + if (reprUrl != null) { + newDatum.getMetaData().put(Nutch.WRITABLE_REPR_URL_KEY, + new Text(reprUrl)); + } + fit = FetchItem.create(redirUrl, newDatum, queueMode); + if (fit != null) { + FetchItemQueue fiq = ((FetchItemQueues) fetchQueues).getFetchItemQueue(fit.queueID); + fiq.addInProgressFetchItem(fit); + } else { + // stop redirecting + redirecting = false; + reporter.incrCounter("FetcherStatus", "FetchItem.notCreated.redirect", + 1); + } + return fit; + } + + private void logError(Text url, String message) { + if (LOG.isInfoEnabled()) { + LOG.info("fetch of " + url + " failed with: " + message); + } + errors.incrementAndGet(); + } + + private ParseStatus output(Text key, CrawlDatum datum, Content content, + ProtocolStatus pstatus, int status) { + + return output(key, datum, content, pstatus, status, 0); + } + + private ParseStatus output(Text key, CrawlDatum datum, Content content, + ProtocolStatus pstatus, int status, int outlinkDepth) { + + datum.setStatus(status); + datum.setFetchTime(System.currentTimeMillis()); + if (pstatus != null) + datum.getMetaData().put(Nutch.WRITABLE_PROTO_STATUS_KEY, pstatus); + + ParseResult parseResult = null; + if (content != null) { + Metadata metadata = content.getMetadata(); + + // store the guessed content type in the crawldatum + if (content.getContentType() != null) + datum.getMetaData().put(new Text(Metadata.CONTENT_TYPE), + new Text(content.getContentType())); + + // add segment to metadata + metadata.set(Nutch.SEGMENT_NAME_KEY, segmentName); + // add score to content metadata so that ParseSegment can pick it up. + try { + scfilters.passScoreBeforeParsing(key, datum, content); + } catch (Exception e) { + if (LOG.isWarnEnabled()) { + LOG.warn("Couldn't pass score, url " + key + " (" + e + ")"); + } + } + /* + * Note: Fetcher will only follow meta-redirects coming from the + * original URL. + */ + if (parsing && status == CrawlDatum.STATUS_FETCH_SUCCESS) { + if (!skipTruncated + || (skipTruncated && !ParseSegment.isTruncated(content))) { + try { + parseResult = this.parseUtil.parse(content); + } catch (Exception e) { + LOG.warn("Error parsing: " + key + ": " + + StringUtils.stringifyException(e)); + } + } + + if (parseResult == null) { + byte[] signature = SignatureFactory.getSignature(conf) + .calculate(content, new ParseStatus().getEmptyParse(conf)); + datum.setSignature(signature); + } + } + + /* + * Store status code in content So we can read this value during parsing + * (as a separate job) and decide to parse or not. + */ + content.getMetadata().add(Nutch.FETCH_STATUS_KEY, + Integer.toString(status)); + } + + try { + output.collect(key, new NutchWritable(datum)); + if (content != null && storingContent) + output.collect(key, new NutchWritable(content)); + if (parseResult != null) { + for (Entry<Text, Parse> entry : parseResult) { + Text url = entry.getKey(); + Parse parse = entry.getValue(); + ParseStatus parseStatus = parse.getData().getStatus(); + ParseData parseData = parse.getData(); + + if (!parseStatus.isSuccess()) { + LOG.warn("Error parsing: " + key + ": " + parseStatus); + parse = parseStatus.getEmptyParse(conf); + } + + // Calculate page signature. For non-parsing fetchers this will + // be done in ParseSegment + byte[] signature = SignatureFactory.getSignature(conf) + .calculate(content, parse); + // Ensure segment name and score are in parseData metadata + parseData.getContentMeta().set(Nutch.SEGMENT_NAME_KEY, segmentName); + parseData.getContentMeta().set(Nutch.SIGNATURE_KEY, + StringUtil.toHexString(signature)); + // Pass fetch time to content meta + parseData.getContentMeta().set(Nutch.FETCH_TIME_KEY, + Long.toString(datum.getFetchTime())); + if (url.equals(key)) + datum.setSignature(signature); + try { + scfilters.passScoreAfterParsing(url, content, parse); + } catch (Exception e) { + if (LOG.isWarnEnabled()) { + LOG.warn("Couldn't pass score, url " + key + " (" + e + ")"); + } + } + + String origin = null; + + // collect outlinks for subsequent db update + Outlink[] links = parseData.getOutlinks(); + int outlinksToStore = Math.min(maxOutlinks, links.length); + if (ignoreExternalLinks || ignoreInternalLinks) { + URL originURL = new URL(url.toString()); + // based on domain? + if ("bydomain".equalsIgnoreCase(ignoreExternalLinksMode)) { + origin = URLUtil.getDomainName(originURL).toLowerCase(); + } + // use host + else { + origin = originURL.getHost().toLowerCase(); + } + } + + //used by fetchNode + if(fetchNode!=null){ + fetchNode.setOutlinks(links); + fetchNode.setTitle(parseData.getTitle()); + FetchNodeDb.getInstance().put(fetchNode.getUrl().toString(), fetchNode); + } + int validCount = 0; + + // Process all outlinks, normalize, filter and deduplicate + List<Outlink> outlinkList = new ArrayList<Outlink>(outlinksToStore); + HashSet<String> outlinks = new HashSet<String>(outlinksToStore); + for (int i = 0; i < links.length && validCount < outlinksToStore; i++) { + String toUrl = links[i].getToUrl(); + + toUrl = ParseOutputFormat.filterNormalize(url.toString(), toUrl, + origin, ignoreInternalLinks, ignoreExternalLinks, ignoreExternalLinksMode, + urlFilters, urlExemptionFilters, normalizers); + if (toUrl == null) { + continue; + } + + validCount++; + links[i].setUrl(toUrl); + outlinkList.add(links[i]); + outlinks.add(toUrl); + } + + // Only process depth N outlinks + if (maxOutlinkDepth > 0 && outlinkDepth < maxOutlinkDepth) { + reporter.incrCounter("FetcherOutlinks", "outlinks_detected", + outlinks.size()); + + // Counter to limit num outlinks to follow per page + int outlinkCounter = 0; + + // Calculate variable number of outlinks by depth using the + // divisor (outlinks = Math.floor(divisor / depth * num.links)) + int maxOutlinksByDepth = (int) Math.floor(outlinksDepthDivisor + / (outlinkDepth + 1) * maxOutlinkDepthNumLinks); + + String followUrl; + + // Walk over the outlinks and add as new FetchItem to the queues + Iterator<String> iter = outlinks.iterator(); + while (iter.hasNext() && outlinkCounter < maxOutlinkDepthNumLinks) { + followUrl = iter.next(); + + // Check whether we'll follow external outlinks + if (outlinksIgnoreExternal) { + if (!URLUtil.getHost(url.toString()).equals( + URLUtil.getHost(followUrl))) { + continue; + } + } + + reporter + .incrCounter("FetcherOutlinks", "outlinks_following", 1); + + // Create new FetchItem with depth incremented + FetchItem fit = FetchItem.create(new Text(followUrl), + new CrawlDatum(CrawlDatum.STATUS_LINKED, interval), + queueMode, outlinkDepth + 1); + ((FetchItemQueues) fetchQueues).addFetchItem(fit); + + outlinkCounter++; + } + } + + // Overwrite the outlinks in ParseData with the normalized and + // filtered set + parseData.setOutlinks(outlinkList.toArray(new Outlink[outlinkList + .size()])); + + output.collect(url, new NutchWritable(new ParseImpl(new ParseText( + parse.getText()), parseData, parse.isCanonical()))); + } + } + } catch (IOException e) { + if (LOG.isErrorEnabled()) { + LOG.error("fetcher caught:" + e.toString()); + } + } + + // return parse status if it exits + if (parseResult != null && !parseResult.isEmpty()) { + Parse p = parseResult.get(content.getUrl()); + if (p != null) { + reporter.incrCounter("ParserStatus", ParseStatus.majorCodes[p + .getData().getStatus().getMajorCode()], 1); + return p.getData().getStatus(); + } + } + return null; + } + + private void updateStatus(int bytesInPage) throws IOException { + pages.incrementAndGet(); + bytes.addAndGet(bytesInPage); + } + + public synchronized void setHalted(boolean halted) { + this.halted = halted; + } + + public synchronized boolean isHalted() { + return halted; + } + + public String getReprUrl() { + return reprUrl; + } + + private void setReprUrl(String urlString) { + this.reprUrl = urlString; + + } + +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/fetcher/QueueFeeder.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/fetcher/QueueFeeder.java b/nutch-core/src/main/java/org/apache/nutch/fetcher/QueueFeeder.java new file mode 100644 index 0000000..79652e7 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/fetcher/QueueFeeder.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.fetcher; + +import java.io.IOException; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.nutch.crawl.CrawlDatum; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class feeds the queues with input items, and re-fills them as items + * are consumed by FetcherThread-s. + */ +public class QueueFeeder extends Thread { + + private static final Logger LOG = LoggerFactory.getLogger(QueueFeeder.class); + + + private RecordReader<Text, CrawlDatum> reader; + private FetchItemQueues queues; + private int size; + private long timelimit = -1; + + public QueueFeeder(RecordReader<Text, CrawlDatum> reader, + FetchItemQueues queues, int size) { + this.reader = reader; + this.queues = queues; + this.size = size; + this.setDaemon(true); + this.setName("QueueFeeder"); + } + + public void setTimeLimit(long tl) { + timelimit = tl; + } + + public void run() { + boolean hasMore = true; + int cnt = 0; + int timelimitcount = 0; + while (hasMore) { + if (System.currentTimeMillis() >= timelimit && timelimit != -1) { + // enough .. lets' simply + // read all the entries from the input without processing them + try { + Text url = new Text(); + CrawlDatum datum = new CrawlDatum(); + hasMore = reader.next(url, datum); + timelimitcount++; + } catch (IOException e) { + LOG.error("QueueFeeder error reading input, record " + cnt, e); + return; + } + continue; + } + int feed = size - queues.getTotalSize(); + if (feed <= 0) { + // queues are full - spin-wait until they have some free space + try { + Thread.sleep(1000); + } catch (Exception e) { + } + ; + continue; + } else { + LOG.debug("-feeding " + feed + " input urls ..."); + while (feed > 0 && hasMore) { + try { + Text url = new Text(); + CrawlDatum datum = new CrawlDatum(); + hasMore = reader.next(url, datum); + if (hasMore) { + queues.addFetchItem(url, datum); + cnt++; + feed--; + } + } catch (IOException e) { + LOG.error("QueueFeeder error reading input, record " + cnt, e); + return; + } + } + } + } + LOG.info("QueueFeeder finished: total " + cnt + + " records + hit by time limit :" + timelimitcount); + } +} http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/fetcher/package.html ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/fetcher/package.html b/nutch-core/src/main/java/org/apache/nutch/fetcher/package.html new file mode 100644 index 0000000..9c843e0 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/fetcher/package.html @@ -0,0 +1,5 @@ +<html> +<body> +The Nutch robot. +</body> +</html> http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/hostdb/HostDatum.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/hostdb/HostDatum.java b/nutch-core/src/main/java/org/apache/nutch/hostdb/HostDatum.java new file mode 100644 index 0000000..424fb1e --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/hostdb/HostDatum.java @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.hostdb; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Date; +import java.util.Map.Entry; +import java.text.SimpleDateFormat; + +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; + +/** + */ +public class HostDatum implements Writable, Cloneable { + protected int failures = 0; + protected float score = 0; + protected Date lastCheck = new Date(0); + protected String homepageUrl = new String(); + + protected MapWritable metaData = new MapWritable(); + + // Records the number of times DNS look-up failed, may indicate host no longer exists + protected int dnsFailures = 0; + + // Records the number of connection failures, may indicate our netwerk being blocked by firewall + protected int connectionFailures = 0; + + protected int unfetched = 0; + protected int fetched = 0; + protected int notModified = 0; + protected int redirTemp = 0; + protected int redirPerm = 0; + protected int gone = 0; + + public HostDatum() { + } + + public HostDatum(float score) { + this(score, new Date()); + } + + public HostDatum(float score, Date lastCheck) { + this(score, lastCheck, new String()); + } + + public HostDatum(float score, Date lastCheck, String homepageUrl) { + this.score = score; + this.lastCheck = lastCheck; + this.homepageUrl = homepageUrl; + } + + public void resetFailures() { + setDnsFailures(0); + setConnectionFailures(0); + } + + public void setDnsFailures(Integer dnsFailures) { + this.dnsFailures = dnsFailures; + } + + public void setConnectionFailures(Integer connectionFailures) { + this.connectionFailures = connectionFailures; + } + + public void incDnsFailures() { + this.dnsFailures++; + } + + public void incConnectionFailures() { + this.connectionFailures++; + } + + public Integer numFailures() { + return getDnsFailures() + getConnectionFailures(); + } + + public Integer getDnsFailures() { + return dnsFailures; + } + + public Integer getConnectionFailures() { + return connectionFailures; + } + + public void setScore(float score) { + this.score = score; + } + + public void setLastCheck() { + setLastCheck(new Date()); + } + + public void setLastCheck(Date date) { + lastCheck = date; + } + + public boolean isEmpty() { + return (lastCheck.getTime() == 0) ? true : false; + } + + public float getScore() { + return score; + } + + public Integer numRecords() { + return unfetched + fetched + gone + redirPerm + redirTemp + notModified; + } + + public Date getLastCheck() { + return lastCheck; + } + + public boolean hasHomepageUrl() { + return homepageUrl.length() > 0; + } + + public String getHomepageUrl() { + return homepageUrl; + } + + public void setHomepageUrl(String homepageUrl) { + this.homepageUrl = homepageUrl; + } + + public void setUnfetched(int val) { + unfetched = val; + } + + public int getUnfetched() { + return unfetched; + } + + public void setFetched(int val) { + fetched = val; + } + + public int getFetched() { + return fetched; + } + + public void setNotModified(int val) { + notModified = val; + } + + public int getNotModified() { + return notModified; + } + + public void setRedirTemp(int val) { + redirTemp = val; + } + + public int getRedirTemp() { + return redirTemp; + } + + public void setRedirPerm(int val) { + redirPerm = val; + } + + public int getRedirPerm() { + return redirPerm; + } + + public void setGone(int val) { + gone = val; + } + + public int getGone() { + return gone; + } + + public void resetStatistics() { + setUnfetched(0); + setFetched(0); + setGone(0); + setRedirTemp(0); + setRedirPerm(0); + setNotModified(0); + } + + public void setMetaData(org.apache.hadoop.io.MapWritable mapWritable) { + this.metaData = new org.apache.hadoop.io.MapWritable(mapWritable); + } + + /** + * Add all metadata from other CrawlDatum to this CrawlDatum. + * + * @param other HostDatum + */ + public void putAllMetaData(HostDatum other) { + for (Entry<Writable, Writable> e : other.getMetaData().entrySet()) { + getMetaData().put(e.getKey(), e.getValue()); + } + } + + /** + * returns a MapWritable if it was set or read in @see readFields(DataInput), + * returns empty map in case CrawlDatum was freshly created (lazily instantiated). + */ + public org.apache.hadoop.io.MapWritable getMetaData() { + if (this.metaData == null) this.metaData = new org.apache.hadoop.io.MapWritable(); + return this.metaData; + } + + @Override + public Object clone() throws CloneNotSupportedException { + HostDatum result = (HostDatum)super.clone(); + result.score = score; + result.lastCheck = lastCheck; + result.homepageUrl = homepageUrl; + + result.dnsFailures = dnsFailures; + result.connectionFailures = connectionFailures; + + result.unfetched = unfetched; + result.fetched = fetched; + result.notModified = notModified; + result.redirTemp = redirTemp; + result.redirPerm = redirPerm; + result.gone = gone; + + result.metaData = metaData; + + return result; + } + + @Override + public void readFields(DataInput in) throws IOException { + score = in.readFloat(); + lastCheck = new Date(in.readLong()); + homepageUrl = Text.readString(in); + + dnsFailures = in.readInt(); + connectionFailures = in.readInt(); + + unfetched= in.readInt(); + fetched= in.readInt(); + notModified= in.readInt(); + redirTemp= in.readInt(); + redirPerm = in.readInt(); + gone = in.readInt(); + + metaData = new org.apache.hadoop.io.MapWritable(); + metaData.readFields(in); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeFloat(score); + out.writeLong(lastCheck.getTime()); + Text.writeString(out, homepageUrl); + + out.writeInt(dnsFailures); + out.writeInt(connectionFailures); + + out.writeInt(unfetched); + out.writeInt(fetched); + out.writeInt(notModified); + out.writeInt(redirTemp); + out.writeInt(redirPerm); + out.writeInt(gone); + + metaData.write(out); + } + + @Override + public String toString() { + StringBuilder buf = new StringBuilder(); + buf.append(Integer.toString(getUnfetched())); + buf.append("\t"); + buf.append(Integer.toString(getFetched())); + buf.append("\t"); + buf.append(Integer.toString(getGone())); + buf.append("\t"); + buf.append(Integer.toString(getRedirTemp())); + buf.append("\t"); + buf.append(Integer.toString(getRedirPerm())); + buf.append("\t"); + buf.append(Integer.toString(getNotModified())); + buf.append("\t"); + buf.append(Integer.toString(numRecords())); + buf.append("\t"); + buf.append(Integer.toString(getDnsFailures())); + buf.append("\t"); + buf.append(Integer.toString(getConnectionFailures())); + buf.append("\t"); + buf.append(Integer.toString(numFailures())); + buf.append("\t"); + buf.append(Float.toString(score)); + buf.append("\t"); + buf.append(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(lastCheck)); + buf.append("\t"); + buf.append(homepageUrl); + buf.append("\t"); + for (Entry<Writable, Writable> e : getMetaData().entrySet()) { + buf.append(e.getKey().toString()); + buf.append(':'); + buf.append(e.getValue().toString()); + buf.append("|||"); + } + return buf.toString(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/hostdb/ReadHostDb.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/hostdb/ReadHostDb.java b/nutch-core/src/main/java/org/apache/nutch/hostdb/ReadHostDb.java new file mode 100644 index 0000000..240e109 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/hostdb/ReadHostDb.java @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.hostdb; + +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.nutch.util.NutchConfiguration; +import org.apache.nutch.util.StringUtil; +import org.apache.nutch.util.TimingUtil; +import org.apache.nutch.util.URLUtil; + +import org.apache.commons.jexl2.JexlContext; +import org.apache.commons.jexl2.Expression; +import org.apache.commons.jexl2.JexlEngine; +import org.apache.commons.jexl2.MapContext; + +/** + * @see http://commons.apache.org/proper/commons-jexl/reference/syntax.html + */ +public class ReadHostDb extends Configured implements Tool { + + public static final Logger LOG = LoggerFactory.getLogger(ReadHostDb.class); + + public static final String HOSTDB_DUMP_HOSTNAMES = "hostdb.dump.hostnames"; + public static final String HOSTDB_DUMP_HOMEPAGES = "hostdb.dump.homepages"; + public static final String HOSTDB_FILTER_EXPRESSION = "hostdb.filter.expression"; + + static class ReadHostDbMapper extends Mapper<Text, HostDatum, Text, Text> { + protected boolean dumpHostnames = false; + protected boolean dumpHomepages = false; + protected Text emptyText = new Text(); + protected Expression expr = null; + + public void setup(Context context) { + dumpHomepages = context.getConfiguration().getBoolean(HOSTDB_DUMP_HOMEPAGES, false); + dumpHostnames = context.getConfiguration().getBoolean(HOSTDB_DUMP_HOSTNAMES, false); + String expr = context.getConfiguration().get(HOSTDB_FILTER_EXPRESSION); + if (expr != null) { + // Create or retrieve a JexlEngine + JexlEngine jexl = new JexlEngine(); + + // Dont't be silent and be strict + jexl.setSilent(true); + jexl.setStrict(true); + + // Create an expression object + this.expr = jexl.createExpression(expr); + } + } + + public void map(Text key, HostDatum datum, Context context) throws IOException, InterruptedException { + if (expr != null) { + // Create a context and add data + JexlContext jcontext = new MapContext(); + + // Set some fixed variables + jcontext.set("unfetched", datum.getUnfetched()); + jcontext.set("fetched", datum.getFetched()); + jcontext.set("gone", datum.getGone()); + jcontext.set("redirTemp", datum.getRedirTemp()); + jcontext.set("redirPerm", datum.getRedirPerm()); + jcontext.set("redirs", datum.getRedirPerm() + datum.getRedirTemp()); + jcontext.set("notModified", datum.getNotModified()); + jcontext.set("ok", datum.getFetched() + datum.getNotModified()); + jcontext.set("numRecords", datum.numRecords()); + jcontext.set("dnsFailures", datum.getDnsFailures()); + jcontext.set("connectionFailures", datum.getConnectionFailures()); + + // Set metadata variables + for (Map.Entry<Writable, Writable> entry : datum.getMetaData().entrySet()) { + Object value = entry.getValue(); + + if (value instanceof FloatWritable) { + FloatWritable fvalue = (FloatWritable)value; + Text tkey = (Text)entry.getKey(); + jcontext.set(tkey.toString(), fvalue.get()); + } + + if (value instanceof IntWritable) { + IntWritable ivalue = (IntWritable)value; + Text tkey = (Text)entry.getKey(); + jcontext.set(tkey.toString(), ivalue.get()); + } + } + + // Filter this record if evaluation did not pass + try { + if (!Boolean.TRUE.equals(expr.evaluate(jcontext))) { + return; + } + } catch (Exception e) { + LOG.info(e.toString() + " for " + key.toString()); + } + } + + if (dumpHomepages) { + if (datum.hasHomepageUrl()) { + context.write(new Text(datum.getHomepageUrl()), emptyText); + } + return; + } + + if (dumpHostnames) { + context.write(key, emptyText); + return; + } + + // Write anyway + context.write(key, new Text(datum.toString())); + } + } + + // Todo, reduce unknown hosts to single unknown domain if possible. Enable via configuration + // host_a.example.org,host_a.example.org ==> example.org +// static class ReadHostDbReducer extends Reduce<Text, Text, Text, Text> { +// public void setup(Context context) { } +// +// public void reduce(Text domain, Iterable<Text> hosts, Context context) throws IOException, InterruptedException { +// +// } +// } + + private void readHostDb(Path hostDb, Path output, boolean dumpHomepages, boolean dumpHostnames, String expr) throws Exception { + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + long start = System.currentTimeMillis(); + LOG.info("ReadHostDb: starting at " + sdf.format(start)); + + Configuration conf = getConf(); + conf.setBoolean(HOSTDB_DUMP_HOMEPAGES, dumpHomepages); + conf.setBoolean(HOSTDB_DUMP_HOSTNAMES, dumpHostnames); + if (expr != null) { + conf.set(HOSTDB_FILTER_EXPRESSION, expr); + } + conf.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false); + conf.set("mapred.textoutputformat.separator", "\t"); + + Job job = new Job(conf, "ReadHostDb"); + job.setJarByClass(ReadHostDb.class); + + FileInputFormat.addInputPath(job, new Path(hostDb, "current")); + FileOutputFormat.setOutputPath(job, output); + + job.setJarByClass(ReadHostDb.class); + job.setMapperClass(ReadHostDbMapper.class); + + job.setInputFormatClass(SequenceFileInputFormat.class); + job.setOutputFormatClass(TextOutputFormat.class); + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(Text.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Text.class); + job.setNumReduceTasks(0); + + try { + job.waitForCompletion(true); + } catch (Exception e) { + throw e; + } + + long end = System.currentTimeMillis(); + LOG.info("ReadHostDb: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end)); + } + + public static void main(String args[]) throws Exception { + int res = ToolRunner.run(NutchConfiguration.create(), new ReadHostDb(), args); + System.exit(res); + } + + public int run(String[] args) throws Exception { + if (args.length < 2) { + System.err.println("Usage: ReadHostDb <hostdb> <output> [-dumpHomepages | -dumpHostnames | -expr <expr.>]"); + return -1; + } + + boolean dumpHomepages = false; + boolean dumpHostnames = false; + String expr = null; + + for (int i = 0; i < args.length; i++) { + if (args[i].equals("-dumpHomepages")) { + LOG.info("ReadHostDb: dumping homepage URL's"); + dumpHomepages = true; + } + if (args[i].equals("-dumpHostnames")) { + LOG.info("ReadHostDb: dumping hostnames"); + dumpHostnames = true; + } + if (args[i].equals("-expr")) { + expr = args[i + 1]; + LOG.info("ReadHostDb: evaluating expression: " + expr); + i++; + } + } + + try { + readHostDb(new Path(args[0]), new Path(args[1]), dumpHomepages, dumpHostnames, expr); + return 0; + } catch (Exception e) { + LOG.error("ReadHostDb: " + StringUtils.stringifyException(e)); + return -1; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/hostdb/ResolverThread.java ---------------------------------------------------------------------- diff --git a/nutch-core/src/main/java/org/apache/nutch/hostdb/ResolverThread.java b/nutch-core/src/main/java/org/apache/nutch/hostdb/ResolverThread.java new file mode 100644 index 0000000..e7c7978 --- /dev/null +++ b/nutch-core/src/main/java/org/apache/nutch/hostdb/ResolverThread.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nutch.hostdb; + +import java.net.InetAddress; +import java.net.UnknownHostException; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.StringUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Simple runnable that performs DNS lookup for a single host. + */ +public class ResolverThread implements Runnable { + + public static final Logger LOG = LoggerFactory.getLogger(ResolverThread.class); + + protected String host = null; + protected HostDatum datum = null; + protected Text hostText = new Text(); + protected OutputCollector<Text,HostDatum> output; + protected Reporter reporter; + protected int purgeFailedHostsThreshold; + + /** + * Constructor. + */ + public ResolverThread(String host, HostDatum datum, + OutputCollector<Text,HostDatum> output, Reporter reporter, int purgeFailedHostsThreshold) { + + hostText.set(host); + this.host = host; + this.datum = datum; + this.output = output; + this.reporter = reporter; + this.purgeFailedHostsThreshold = purgeFailedHostsThreshold; + } + + /** + * + */ + public void run() { + // Resolve the host and act appropriatly + try { + // Throws an exception if host is not found + InetAddress inetAddr = InetAddress.getByName(host); + + if (datum.isEmpty()) { + reporter.incrCounter("UpdateHostDb", "new_known_host" ,1); + datum.setLastCheck(); + LOG.info(host + ": new_known_host " + datum); + } else if (datum.getDnsFailures() > 0) { + reporter.incrCounter("UpdateHostDb", "rediscovered_host" ,1); + datum.setLastCheck(); + datum.setDnsFailures(0); + LOG.info(host + ": rediscovered_host " + datum); + } else { + reporter.incrCounter("UpdateHostDb", "existing_known_host", 1); + datum.setLastCheck(); + LOG.info(host + ": existing_known_host " + datum); + } + + // Write the host datum + output.collect(hostText, datum); + } catch (UnknownHostException e) { + try { + // If the counter is empty we'll initialize with date = today and 1 failure + if (datum.isEmpty()) { + datum.setLastCheck(); + datum.setDnsFailures(1); + output.collect(hostText, datum); + reporter.incrCounter("UpdateHostDb", "new_unknown_host", 1); + LOG.info(host + ": new_unknown_host " + datum); + } else { + datum.setLastCheck(); + datum.incDnsFailures(); + + // Check if this host should be forgotten + if (purgeFailedHostsThreshold == -1 || + purgeFailedHostsThreshold < datum.getDnsFailures()) { + + output.collect(hostText, datum); + reporter.incrCounter("UpdateHostDb", "existing_unknown_host" ,1); + LOG.info(host + ": existing_unknown_host " + datum); + } else { + reporter.incrCounter("UpdateHostDb", "purged_unknown_host" ,1); + LOG.info(host + ": purged_unknown_host " + datum); + } + } + + reporter.incrCounter("UpdateHostDb", + Integer.toString(datum.numFailures()) + "_times_failed", 1); + } catch (Exception ioe) { + LOG.warn(StringUtils.stringifyException(ioe)); + } + } catch (Exception e) { + LOG.warn(StringUtils.stringifyException(e)); + } + + reporter.incrCounter("UpdateHostDb", "checked_hosts", 1); + } +} \ No newline at end of file
