This is an automated email from the ASF dual-hosted git repository.
markus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nutch.git
The following commit(s) were added to refs/heads/master by this push:
new 0cdd095 NUTCH-2445 Fetcher following outlinks to keep track of
already fetched items
0cdd095 is described below
commit 0cdd095c881eed52dc461e559ce6ae278e99157f
Author: Markus Jelsma <[email protected]>
AuthorDate: Mon Oct 23 15:59:13 2017 +0200
NUTCH-2445 Fetcher following outlinks to keep track of already fetched items
---
.../org/apache/nutch/fetcher/FetchItemQueue.java | 6 ++++
.../org/apache/nutch/fetcher/FetcherThread.java | 41 ++++++++++++++--------
2 files changed, 32 insertions(+), 15 deletions(-)
diff --git a/src/java/org/apache/nutch/fetcher/FetchItemQueue.java
b/src/java/org/apache/nutch/fetcher/FetchItemQueue.java
index b67be74..5096b37 100644
--- a/src/java/org/apache/nutch/fetcher/FetchItemQueue.java
+++ b/src/java/org/apache/nutch/fetcher/FetchItemQueue.java
@@ -22,6 +22,8 @@ import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.HashSet;
+import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
@@ -51,6 +53,10 @@ public class FetchItemQueue {
Text cookie;
Text variableFetchDelayKey = new Text("_variableFetchDelay_");
boolean variableFetchDelaySet = false;
+ // keep track of duplicates if fetcher.follow.outlinks.depth > 0. Some urls
may
+ // not get followed due to hash collisions. Hashing is used to reduce memory
+ // usage.
+ Set<Integer> alreadyFetched = new HashSet<>();
public FetchItemQueue(Configuration conf, int maxThreads, long crawlDelay,
long minCrawlDelay) {
diff --git a/src/java/org/apache/nutch/fetcher/FetcherThread.java
b/src/java/org/apache/nutch/fetcher/FetcherThread.java
index 77947b6..42d5d50 100644
--- a/src/java/org/apache/nutch/fetcher/FetcherThread.java
+++ b/src/java/org/apache/nutch/fetcher/FetcherThread.java
@@ -198,7 +198,7 @@ public class FetcherThread extends Thread {
+ " - forcing to byHost");
queueMode = FetchItemQueues.QUEUE_MODE_HOST;
}
- LOG.info("Using queue mode : " + queueMode);
+ LOG.info(getName() + " " + Thread.currentThread().getId() + " Using queue
mode : " + queueMode);
this.maxRedirect = conf.getInt("http.redirect.max", 3);
maxOutlinksPerPage = conf.getInt("db.max.outlinks.per.page", 100);
@@ -219,7 +219,7 @@ public class FetcherThread extends Thread {
if (storingContent) {
robotsTxtContent = new LinkedList<>();
} else {
- LOG.warn("Ignoring fetcher.store.robotstxt because not storing content
(fetcher.store.content)!");
+ LOG.warn(getName() + " " + Thread.currentThread().getId() + " Ignoring
fetcher.store.robotstxt because not storing content (fetcher.store.content)!");
}
}
}
@@ -262,7 +262,7 @@ public class FetcherThread extends Thread {
continue;
} else {
// all done, finish this thread
- LOG.info("Thread " + getName() + " has no more work available");
+ LOG.info(getName() + " " + Thread.currentThread().getId() + " has
no more work available");
return;
}
}
@@ -287,7 +287,7 @@ public class FetcherThread extends Thread {
do {
if (LOG.isInfoEnabled()) {
- LOG.info("fetching " + fit.url + " (queue crawl delay="
+ LOG.info(getName() + " " + Thread.currentThread().getId() + "
fetching " + fit.url + " (queue crawl delay="
+ ((FetchItemQueues)
fetchQueues).getFetchItemQueue(fit.queueID).crawlDelay
+ "ms)");
}
@@ -438,7 +438,7 @@ public class FetcherThread extends Thread {
default:
if (LOG.isWarnEnabled()) {
- LOG.warn("Unknown ProtocolStatus: " + status.getCode());
+ LOG.warn(getName() + " " + Thread.currentThread().getId() + "
Unknown ProtocolStatus: " + status.getCode());
}
output(fit.url, fit.datum, null, status,
CrawlDatum.STATUS_FETCH_RETRY);
@@ -447,7 +447,7 @@ public class FetcherThread extends Thread {
if (redirecting && redirectCount > maxRedirect) {
((FetchItemQueues) fetchQueues).finishFetchItem(fit);
if (LOG.isInfoEnabled()) {
- LOG.info(" - redirect count exceeded " + fit.url);
+ LOG.info(getName() + " " + Thread.currentThread().getId() + "
- redirect count exceeded " + fit.url);
}
output(fit.url, fit.datum, null,
ProtocolStatus.STATUS_REDIR_EXCEEDED,
@@ -473,7 +473,7 @@ public class FetcherThread extends Thread {
if (fit != null)
((FetchItemQueues) fetchQueues).finishFetchItem(fit);
activeThreads.decrementAndGet(); // count threads
- LOG.info("-finishing thread " + getName() + ", activeThreads="
+ LOG.info(getName() + " " + Thread.currentThread().getId() + " -finishing
thread " + getName() + ", activeThreads="
+ activeThreads);
}
}
@@ -577,7 +577,7 @@ public class FetcherThread extends Thread {
private void logError(Text url, String message) {
if (LOG.isInfoEnabled()) {
- LOG.info("fetch of " + url + " failed with: " + message);
+ LOG.info(getName() + " " + Thread.currentThread().getId() + " fetch of "
+ url + " failed with: " + message);
}
errors.incrementAndGet();
}
@@ -612,7 +612,7 @@ public class FetcherThread extends Thread {
scfilters.passScoreBeforeParsing(key, datum, content);
} catch (Exception e) {
if (LOG.isWarnEnabled()) {
- LOG.warn("Couldn't pass score, url " + key + " (" + e + ")");
+ LOG.warn(getName() + " " + Thread.currentThread().getId() + "
Couldn't pass score, url " + key + " (" + e + ")");
}
}
/*
@@ -625,7 +625,7 @@ public class FetcherThread extends Thread {
try {
parseResult = this.parseUtil.parse(content);
} catch (Exception e) {
- LOG.warn("Error parsing: " + key + ": "
+ LOG.warn(getName() + " " + Thread.currentThread().getId() + "
Error parsing: " + key + ": "
+ StringUtils.stringifyException(e));
}
}
@@ -657,7 +657,7 @@ public class FetcherThread extends Thread {
ParseData parseData = parse.getData();
if (!parseStatus.isSuccess()) {
- LOG.warn("Error parsing: " + key + ": " + parseStatus);
+ LOG.warn(getName() + " " + Thread.currentThread().getId() + "
Error parsing: " + key + ": " + parseStatus);
parse = parseStatus.getEmptyParse(conf);
}
@@ -678,7 +678,7 @@ public class FetcherThread extends Thread {
scfilters.passScoreAfterParsing(url, content, parse);
} catch (Exception e) {
if (LOG.isWarnEnabled()) {
- LOG.warn("Couldn't pass score, url " + key + " (" + e + ")");
+ LOG.warn(getName() + " " + Thread.currentThread().getId() + "
Couldn't pass score, url " + key + " (" + e + ")");
}
}
@@ -740,6 +740,10 @@ public class FetcherThread extends Thread {
}
// Only process depth N outlinks
if (maxOutlinkDepth > 0 && outlinkDepth < maxOutlinkDepth) {
+ FetchItem ft = FetchItem.create(url, null, queueMode);
+ FetchItemQueue queue = ((FetchItemQueues)
fetchQueues).getFetchItemQueue(ft.queueID);
+ queue.alreadyFetched.add(url.toString().hashCode());
+
reporter.incrCounter("FetcherOutlinks", "outlinks_detected",
outlinks.size());
@@ -766,13 +770,20 @@ public class FetcherThread extends Thread {
}
}
- reporter
- .incrCounter("FetcherOutlinks", "outlinks_following", 1);
-
+ // Already followed?
+ int urlHashCode = followUrl.hashCode();
+ if (queue.alreadyFetched.contains(urlHashCode)) {
+ continue;
+ }
+ queue.alreadyFetched.add(urlHashCode);
+
// Create new FetchItem with depth incremented
FetchItem fit = FetchItem.create(new Text(followUrl),
new CrawlDatum(CrawlDatum.STATUS_LINKED, interval),
queueMode, outlinkDepth + 1);
+
+ reporter
+ .incrCounter("FetcherOutlinks", "outlinks_following", 1);
((FetchItemQueues) fetchQueues).addFetchItem(fit);
outlinkCounter++;
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].