Author: markus
Date: Thu Dec 29 14:32:50 2011
New Revision: 1225543

URL: http://svn.apache.org/viewvc?rev=1225543&view=rev
Log:
NUTCH-1238 Fetcher throughput threshold must start before feeder finished

Modified:
    nutch/trunk/conf/nutch-default.xml
    nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java

Modified: nutch/trunk/conf/nutch-default.xml
URL: 
http://svn.apache.org/viewvc/nutch/trunk/conf/nutch-default.xml?rev=1225543&r1=1225542&r2=1225543&view=diff
==============================================================================
--- nutch/trunk/conf/nutch-default.xml (original)
+++ nutch/trunk/conf/nutch-default.xml Thu Dec 29 14:32:50 2011
@@ -723,6 +723,12 @@
 </property>
 
 <property>
+  <name>fetcher.throughput.threshold.check.after</name>
+  <value>5</value>
+  <description>The number of minutes after which the throughput check is 
enabled.</description>
+</property>
+
+<property>
   <name>fetcher.threads.timeout.divisor</name>
   <value>2</value>
   <description>(EXPERT)The thread time-out divisor to use. By default threads 
have a time-out

Modified: nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java
URL: 
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java?rev=1225543&r1=1225542&r2=1225543&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java Thu Dec 29 
14:32:50 2011
@@ -1180,6 +1180,7 @@ public class Fetcher extends Configured 
     if (LOG.isInfoEnabled()) { LOG.info("Fetcher: throughput threshold: " + 
throughputThresholdPages); }
     int throughputThresholdMaxRetries = 
getConf().getInt("fetcher.throughput.threshold.retries", 5);
     if (LOG.isInfoEnabled()) { LOG.info("Fetcher: throughput threshold 
retries: " + throughputThresholdMaxRetries); }
+    long throughputThresholdTimeLimit = 
getConf().getLong("fetcher.throughput.threshold.check.after", -1);
 
     do {                                          // wait for threads to exit
       pagesLastSec = pages.get();
@@ -1204,15 +1205,9 @@ public class Fetcher extends Configured 
       }
 
       // if throughput threshold is enabled
-      if (!feeder.isAlive() && throughputThresholdPages != -1) {
-        // Have we reached the threshold of pages/second and threshold was not 
yet exceeded
-        if (pagesLastSec > throughputThresholdPages && 
!throughputThresholdExceeded) {
-          LOG.info("Exceding " + Integer.toString(throughputThresholdPages) + 
" pages/second");
-          throughputThresholdExceeded = true;
-        }
-
+      if (throughputThresholdTimeLimit < System.currentTimeMillis() && 
throughputThresholdPages != -1) {
         // Check if we're dropping below the threshold
-        if (throughputThresholdExceeded && pagesLastSec < 
throughputThresholdPages) {
+        if (pagesLastSec < throughputThresholdPages) {
           throughputThresholdNumRetries++;
           LOG.warn(Integer.toString(throughputThresholdNumRetries) + ": 
dropping below configured threshold of " + 
Integer.toString(throughputThresholdPages) + " pages per second");
 
@@ -1274,6 +1269,11 @@ public class Fetcher extends Configured 
       getConf().setLong("fetcher.timelimit", timelimit);
     }
 
+    // Set the time limit after which the throughput threshold feature is 
enabled
+    timelimit = getConf().getLong("fetcher.throughput.threshold.check.after", 
10);
+    timelimit = System.currentTimeMillis() + (timelimit * 60 * 1000);
+    getConf().setLong("fetcher.throughput.threshold.check.after", timelimit);
+
     int maxOutlinkDepth = getConf().getInt("fetcher.follow.outlinks.depth", 
-1);
     if (maxOutlinkDepth > 0) {
       LOG.info("Fetcher: following outlinks up to depth: " + 
Integer.toString(maxOutlinkDepth));


Reply via email to