Author: jnioche
Date: Sat May 10 12:39:11 2014
New Revision: 1593694
URL: http://svn.apache.org/r1593694
Log:
NUTCH-207 Bandwidth target for fetcher rather than a thread count
Modified:
nutch/trunk/CHANGES.txt
nutch/trunk/conf/nutch-default.xml
nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java
Modified: nutch/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/nutch/trunk/CHANGES.txt?rev=1593694&r1=1593693&r2=1593694&view=diff
==============================================================================
--- nutch/trunk/CHANGES.txt (original)
+++ nutch/trunk/CHANGES.txt Sat May 10 12:39:11 2014
@@ -2,6 +2,8 @@ Nutch Change Log
Nutch Current Development
+* NUTCH-207 Bandwidth target for fetcher rather than a thread count (jnioche)
+
* NUTCH-1182 fetcher to log hung threads (snagel)
* NUTCH-1759 Upgrade to Crawler Commons 0.4 (jnioche)
Modified: nutch/trunk/conf/nutch-default.xml
URL:
http://svn.apache.org/viewvc/nutch/trunk/conf/nutch-default.xml?rev=1593694&r1=1593693&r2=1593694&view=diff
==============================================================================
--- nutch/trunk/conf/nutch-default.xml (original)
+++ nutch/trunk/conf/nutch-default.xml Sat May 10 12:39:11 2014
@@ -838,6 +838,28 @@
</description>
</property>
+<property>
+ <name>fetcher.bandwidth.target</name>
+ <value>-1</value>
+ <description>Target bandwidth in kilobits per sec for each mapper instance.
This is used to adjust the number of
+ fetching threads automatically (up to fetcher.maxNum.threads). A value of -1
deactivates the functionality, in which case
+ the number of fetching threads is fixed (see
fetcher.threads.fetch).</description>
+</property>
+
+<property>
+ <name>fetcher.maxNum.threads</name>
+ <value>25</value>
+ <description>Max number of fetch threads allowed when using
fetcher.bandwidth.target. Defaults to fetcher.threads.fetch if unspecified or
+ set to a value lower than it. </description>
+</property>
+
+<property>
+ <name>fetcher.bandwidth.target.check.everyNSecs</name>
+ <value>30</value>
+ <description>(EXPERT) Value in seconds which determines how frequently we
should reassess the optimal number of fetch threads when using
+ fetcher.bandwidth.target. Defaults to 30 and must be at least
1.</description>
+</property>
+
<!-- moreindexingfilter plugin properties -->
<property>
Modified: nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java
URL:
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java?rev=1593694&r1=1593693&r2=1593694&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java Sat May 10
12:39:11 2014
@@ -595,6 +595,8 @@ public class Fetcher extends Configured
private int outlinksDepthDivisor;
private boolean skipTruncated;
+
+ private boolean halted = false;
public FetcherThread(Configuration conf) {
this.setDaemon(true); // don't hang JVM on exit
@@ -637,6 +639,13 @@ public class Fetcher extends Configured
try {
while (true) {
+ // check whether must be stopped
+ if (isHalted()) {
+ LOG.debug(getName() + " set to halted");
+ fit = null;
+ return;
+ }
+
fit = fetchQueues.getFetchItem();
if (fit == null) {
if (feeder.isAlive() || fetchQueues.getTotalSize() > 0) {
@@ -650,6 +659,7 @@ public class Fetcher extends Configured
continue;
} else {
// all done, finish this thread
+ LOG.info("Thread " + getName() + " has no more work available");
return;
}
}
@@ -666,8 +676,8 @@ public class Fetcher extends Configured
redirecting = false;
redirectCount = 0;
do {
- if (LOG.isInfoEnabled()) {
- LOG.info("fetching " + fit.url + " (queue crawl delay=" +
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("fetching " + fit.url + " (queue crawl delay=" +
fetchQueues.getFetchItemQueue(fit.queueID).crawlDelay
+ "ms)");
}
if (LOG.isDebugEnabled()) {
@@ -1099,6 +1109,14 @@ public class Fetcher extends Configured
return null;
}
+ public synchronized void setHalted(boolean halted) {
+ this.halted = halted;
+ }
+
+ public synchronized boolean isHalted() {
+ return halted;
+ }
+
}
public Fetcher() { super(null); }
@@ -1201,7 +1219,24 @@ public class Fetcher extends Configured
int throughputThresholdMaxRetries =
getConf().getInt("fetcher.throughput.threshold.retries", 5);
if (LOG.isInfoEnabled()) { LOG.info("Fetcher: throughput threshold
retries: " + throughputThresholdMaxRetries); }
long throughputThresholdTimeLimit =
getConf().getLong("fetcher.throughput.threshold.check.after", -1);
-
+
+ int targetBandwidth = getConf().getInt("fetcher.bandwidth.target", -1) *
1000;
+ int maxNumThreads = getConf().getInt("fetcher.maxNum.threads",
threadCount);
+ if (maxNumThreads < threadCount){
+ LOG.info("fetcher.maxNum.threads can't be < than "+ threadCount + " :
using "+threadCount+" instead");
+ maxNumThreads = threadCount;
+ }
+ int bandwidthTargetCheckEveryNSecs =
getConf().getInt("fetcher.bandwidth.target.check.everyNSecs", 30);
+ if (bandwidthTargetCheckEveryNSecs < 1){
+ LOG.info("fetcher.bandwidth.target.check.everyNSecs can't be < to 1 :
using 1 instead");
+ bandwidthTargetCheckEveryNSecs = 1;
+ }
+
+ int maxThreadsPerQueue = getConf().getInt("fetcher.threads.per.queue", 1);
+
+ int bandwidthTargetCheckCounter = 0;
+ long bytesAtLastBWTCheck = 0l;
+
do { // wait for threads to exit
pagesLastSec = pages.get();
bytesLastSec = (int)bytes.get();
@@ -1218,7 +1253,7 @@ public class Fetcher extends Configured
reportStatus(pagesLastSec, bytesLastSec);
LOG.info("-activeThreads=" + activeThreads + ", spinWaiting=" +
spinWaiting.get()
- + ", fetchQueues.totalSize=" + fetchQueues.getTotalSize());
+ + ", fetchQueues.totalSize=" + fetchQueues.getTotalSize()+ ",
fetchQueues.getQueueCount="+fetchQueues.getQueueCount());
if (!feeder.isAlive() && fetchQueues.getTotalSize() < 5) {
fetchQueues.dump();
@@ -1246,6 +1281,57 @@ public class Fetcher extends Configured
}
}
}
+
+ // adjust the number of threads if a target bandwidth has been set
+ if (targetBandwidth>0) {
+ if (bandwidthTargetCheckCounter < bandwidthTargetCheckEveryNSecs)
bandwidthTargetCheckCounter++;
+ else if (bandwidthTargetCheckCounter ==
bandwidthTargetCheckEveryNSecs){
+ long bpsSinceLastCheck = ((bytes.get() - bytesAtLastBWTCheck) *
8)/bandwidthTargetCheckEveryNSecs;
+
+ bytesAtLastBWTCheck = bytes.get();
+ bandwidthTargetCheckCounter = 0;
+
+ int averageBdwPerThread = 0;
+ if (activeThreads.get()>0)
+ averageBdwPerThread =
Math.round(bpsSinceLastCheck/activeThreads.get());
+
+ LOG.info("averageBdwPerThread : "+(averageBdwPerThread/1000) + "
kbps");
+
+ if (bpsSinceLastCheck < targetBandwidth && averageBdwPerThread > 0){
+ // check whether it is worth doing e.g. more queues than threads
+
+ if ((fetchQueues.getQueueCount() * maxThreadsPerQueue) >
activeThreads.get()){
+
+ long remainingBdw = targetBandwidth - bpsSinceLastCheck;
+ int additionalThreads =
Math.round(remainingBdw/averageBdwPerThread);
+ int availableThreads = maxNumThreads - activeThreads.get();
+
+ // determine the number of available threads (min between
availableThreads and additionalThreads)
+ additionalThreads = (availableThreads < additionalThreads ?
availableThreads:additionalThreads);
+ LOG.info("Has space for more threads ("+(bpsSinceLastCheck/1000)
+" vs "+(targetBandwidth/1000)+" kbps) \t=> adding "+additionalThreads+" new
threads");
+ // activate new threads
+ for (int i = 0; i < additionalThreads; i++) {
+ FetcherThread thread = new FetcherThread(getConf());
+ fetcherThreads.add(thread);
+ thread.start();
+ }
+ }
+ }
+ else if (bpsSinceLastCheck > targetBandwidth && averageBdwPerThread
> 0){
+ // if the bandwidth we're using is greater then the expected
bandwidth, we have to stop some threads
+ long excessBdw = bpsSinceLastCheck - targetBandwidth;
+ int excessThreads = Math.round(excessBdw/averageBdwPerThread);
+ LOG.info("Exceeding target bandwidth ("+bpsSinceLastCheck/1000 +"
vs "+(targetBandwidth/1000)+" kbps). \t=> excessThreads = "+excessThreads);
+ // keep at least one
+ if (excessThreads >= fetcherThreads.size()) excessThreads = 0;
+ // de-activates threads
+ for (int i = 0; i < excessThreads; i++) {
+ FetcherThread thread = fetcherThreads.removeLast();
+ thread.setHalted(true);
+ }
+ }
+ }
+ }
// check timelimit
if (!feeder.isAlive()) {