Author: ab
Date: Tue Dec  1 15:15:00 2009
New Revision: 885785

URL: http://svn.apache.org/viewvc?rev=885785&view=rev
Log:
NUTCH-769 Fetcher to skip queues for URLS getting repeated exceptions.

Modified:
    lucene/nutch/trunk/CHANGES.txt
    lucene/nutch/trunk/conf/nutch-default.xml
    lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java

Modified: lucene/nutch/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/lucene/nutch/trunk/CHANGES.txt?rev=885785&r1=885784&r2=885785&view=diff
==============================================================================
--- lucene/nutch/trunk/CHANGES.txt (original)
+++ lucene/nutch/trunk/CHANGES.txt Tue Dec  1 15:15:00 2009
@@ -2,6 +2,9 @@
 
 Unreleased Changes
 
+* NUTCH-769 Fetcher to skip queues for URLS getting repeated exceptions
+  (Julien Nioche via ab)
+
 * NUTCH-768 - Upgrade Nutch 1.0 to use Hadoop 0.20.1, also upgrades Xerces to 
   version 2.9.1. (kubes)
   

Modified: lucene/nutch/trunk/conf/nutch-default.xml
URL: 
http://svn.apache.org/viewvc/lucene/nutch/trunk/conf/nutch-default.xml?rev=885785&r1=885784&r2=885785&view=diff
==============================================================================
--- lucene/nutch/trunk/conf/nutch-default.xml (original)
+++ lucene/nutch/trunk/conf/nutch-default.xml Tue Dec  1 15:15:00 2009
@@ -610,6 +610,16 @@
   </description>
 </property>
 
+<property>
+  <name>fetcher.max.exceptions.per.queue</name>
+  <value>-1</value>
+  <description>The maximum number of protocol-level exceptions (e.g. timeouts) 
per
+  host (or IP) queue. Once this value is reached, any remaining entries from 
this
+  queue are purged, effectively stopping the fetching from this host/IP. The 
default
+  value of -1 deactivates this limit.
+  </description>
+</property>
+
 <!-- indexer properties -->
 
 <property>

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java
URL: 
http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java?rev=885785&r1=885784&r2=885785&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java Tue Dec  
1 15:15:00 2009
@@ -208,6 +208,7 @@
     List<FetchItem> queue = Collections.synchronizedList(new 
LinkedList<FetchItem>());
     Set<FetchItem>  inProgress = Collections.synchronizedSet(new 
HashSet<FetchItem>());
     AtomicLong nextFetchTime = new AtomicLong();
+    AtomicInteger exceptionCounter = new AtomicInteger();
     long crawlDelay;
     long minCrawlDelay;
     int maxThreads;
@@ -236,6 +237,10 @@
       return inProgress.size();
     }
     
+    public int incrementExceptionCounter() {
+      return exceptionCounter.incrementAndGet();
+    }
+    
     public void finishFetchItem(FetchItem it, boolean asap) {
       if (it != null) {
         inProgress.remove(it);
@@ -306,6 +311,7 @@
     long crawlDelay;
     long minCrawlDelay;
     long timelimit = -1;
+    int maxExceptionsPerQueue = -1;
     Configuration conf;    
     
     public FetchItemQueues(Configuration conf) {
@@ -316,6 +322,7 @@
       this.crawlDelay = (long) (conf.getFloat("fetcher.server.delay", 1.0f) * 
1000);
       this.minCrawlDelay = (long) (conf.getFloat("fetcher.server.min.delay", 
0.0f) * 1000);
       this.timelimit = conf.getLong("fetcher.timelimit.mins", -1);
+      this.maxExceptionsPerQueue = 
conf.getInt("fetcher.max.exceptions.per.queue", -1);
     }
     
     public int getTotalSize() {
@@ -402,6 +409,36 @@
       return count;
     }
     
+    /**
+     * Increment the exception counter of a queue in case of an exception e.g.
+     * timeout; when higher than a given threshold simply empty the queue.
+     *
+     * @param queueid
+     * @return number of purged items
+     */
+    public synchronized int checkExceptionThreshold(String queueid) {
+      FetchItemQueue fiq = queues.get(queueid);
+      if (fiq == null) {
+        return 0;
+      }
+      if (fiq.getQueueSize() == 0) {
+        return 0;
+      }
+      int excCount = fiq.incrementExceptionCounter();
+      if (maxExceptionsPerQueue!= -1 && excCount >= maxExceptionsPerQueue) {
+        // too many exceptions for items in this queue - purge it
+        int deleted = fiq.emptyQueue();
+        LOG.info("* queue: " + queueid + " >> removed " + deleted
+            + " URLs from queue because " + excCount + " exceptions occurred");
+        for (int i = 0; i < deleted; i++) {
+          totalSize.decrementAndGet();
+        }
+        return deleted;
+      }
+      return 0;
+    }
+
+    
     public synchronized void dump() {
       for (String id : queues.keySet()) {
         FetchItemQueue fiq = queues.get(id);
@@ -673,6 +710,8 @@
 
               case ProtocolStatus.EXCEPTION:
                 logError(fit.url, status.getMessage());
+                int killedURLs = 
fetchQueues.checkExceptionThreshold(fit.getQueueID());
+                reporter.incrCounter("FetcherStatus", "Exceptions", 
killedURLs);
                 /* FALLTHROUGH */
               case ProtocolStatus.RETRY:          // retry
               case ProtocolStatus.BLOCKED:


Reply via email to