This is an automated email from the ASF dual-hosted git repository.
snagel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nutch.git
The following commit(s) were added to refs/heads/master by this push:
new 546237d NUTCH-2627 Fetcher to optionally filter URLs - filter and
normalize URLs in QueueFeeder if fetcher.filter.urls resp.
fetcher.normalize.urls are true (default is false, i.e., fetcher does not
filter and normalize)
new 8cf9e67 Merge pull request #370 from
sebastian-nagel/NUTCH-2627-fetcher-filter-urls
546237d is described below
commit 546237d4789b2df958752a722053a89c31c24597
Author: Sebastian Nagel <[email protected]>
AuthorDate: Fri Jul 27 13:52:27 2018 +0200
NUTCH-2627 Fetcher to optionally filter URLs
- filter and normalize URLs in QueueFeeder
if fetcher.filter.urls resp. fetcher.normalize.urls are true
(default is false, i.e., fetcher does not filter and normalize)
---
conf/nutch-default.xml | 12 +++
src/java/org/apache/nutch/fetcher/QueueFeeder.java | 91 ++++++++++++++++------
2 files changed, 78 insertions(+), 25 deletions(-)
diff --git a/conf/nutch-default.xml b/conf/nutch-default.xml
index a42e6a9..77ba170 100644
--- a/conf/nutch-default.xml
+++ b/conf/nutch-default.xml
@@ -1172,6 +1172,18 @@
Publisher implementation specific properties</description>
</property>
+<property>
+ <name>fetcher.filter.urls</name>
+ <value>false</value>
+ <description>Whether fetcher will filter URLs (with the configured URL
filters).</description>
+</property>
+
+<property>
+ <name>fetcher.normalize.urls</name>
+ <value>false</value>
+ <description>Whether fetcher will normalize URLs (with the configured URL
normalizers).</description>
+</property>
+
<!-- any23 plugin properties -->
<property>
diff --git a/src/java/org/apache/nutch/fetcher/QueueFeeder.java
b/src/java/org/apache/nutch/fetcher/QueueFeeder.java
index 72009ad..f5fa663 100644
--- a/src/java/org/apache/nutch/fetcher/QueueFeeder.java
+++ b/src/java/org/apache/nutch/fetcher/QueueFeeder.java
@@ -18,10 +18,15 @@ package org.apache.nutch.fetcher;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
+import java.net.MalformedURLException;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.nutch.crawl.CrawlDatum;
import org.apache.nutch.fetcher.Fetcher.FetcherRun;
+import org.apache.nutch.net.URLFilterException;
+import org.apache.nutch.net.URLFilters;
+import org.apache.nutch.net.URLNormalizers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,6 +43,9 @@ public class QueueFeeder extends Thread {
private FetchItemQueues queues;
private int size;
private long timelimit = -1;
+ private URLFilters urlFilters = null;
+ private URLNormalizers urlNormalizers = null;
+ private String urlNormalizerScope = URLNormalizers.SCOPE_DEFAULT;
public QueueFeeder(FetcherRun.Context context,
FetchItemQueues queues, int size) {
@@ -46,20 +54,43 @@ public class QueueFeeder extends Thread {
this.size = size;
this.setDaemon(true);
this.setName("QueueFeeder");
+ Configuration conf = context.getConfiguration();
+ if (conf.getBoolean("fetcher.filter.urls", false)) {
+ urlFilters = new URLFilters(conf);
+ }
+ if (conf.getBoolean("fetcher.normalize.urls", false)) {
+ urlNormalizers = new URLNormalizers(conf, urlNormalizerScope);
+ }
}
public void setTimeLimit(long tl) {
timelimit = tl;
}
+ /** Filter and normalize the url */
+ private String filterNormalize(String url) {
+ if (url != null) {
+ try {
+ if (urlNormalizers != null)
+ url = urlNormalizers.normalize(url, urlNormalizerScope); //
normalize the url
+ if (urlFilters != null)
+ url = urlFilters.filter(url);
+ } catch (MalformedURLException | URLFilterException e) {
+ LOG.warn("Skipping {}: {}", url, e);
+ url = null;
+ }
+ }
+ return url;
+ }
+
public void run() {
boolean hasMore = true;
int cnt = 0;
int timelimitcount = 0;
while (hasMore) {
if (System.currentTimeMillis() >= timelimit && timelimit != -1) {
- // enough .. lets' simply
- // read all the entries from the input without processing them
+ // enough ... lets' simply read all the entries from the input without
+ // processing them
try {
hasMore = context.nextKeyValue();
timelimitcount++;
@@ -77,33 +108,43 @@ public class QueueFeeder extends Thread {
// queues are full - spin-wait until they have some free space
try {
Thread.sleep(1000);
- } catch (Exception e) {
+ } catch (InterruptedException e) {
}
- ;
continue;
- } else {
- LOG.debug("-feeding {} input urls ...", feed);
- while (feed > 0 && hasMore) {
- try {
- hasMore = context.nextKeyValue();
- if (hasMore) {
- /*
- * Need to copy key and value objects because MapReduce will
reuse
- * the original objects while the objects are stored in the
queue.
- */
- Text url = new Text((Text)context.getCurrentKey());
- CrawlDatum datum = new CrawlDatum();
- datum.set((CrawlDatum)context.getCurrentValue());
- queues.addFetchItem(url, datum);
- cnt++;
- feed--;
+ }
+ LOG.debug("-feeding {} input urls ...", feed);
+ while (feed > 0 && hasMore) {
+ try {
+ hasMore = context.nextKeyValue();
+ if (hasMore) {
+ Text url = context.getCurrentKey();
+ if (urlFilters != null || urlNormalizers != null) {
+ String u = filterNormalize(url.toString());
+ if (u == null) {
+ // filtered or failed to normalize
+ context.getCounter("FetcherStatus", "filtered").increment(1);
+ continue;
+ }
+ url = new Text(u);
}
- } catch (IOException e) {
- LOG.error("QueueFeeder error reading input, record " + cnt, e);
- return;
- } catch (InterruptedException e) {
- LOG.info("QueueFeeder interrupted, exception:", e);
+ /*
+ * Need to copy key and value objects because MapReduce will reuse
+ * the original objects while the objects are stored in the queue.
+ */
+ else {
+ url = new Text(url);
+ }
+ CrawlDatum datum = new CrawlDatum();
+ datum.set((CrawlDatum) context.getCurrentValue());
+ queues.addFetchItem(url, datum);
+ cnt++;
+ feed--;
}
+ } catch (IOException e) {
+ LOG.error("QueueFeeder error reading input, record " + cnt, e);
+ return;
+ } catch (InterruptedException e) {
+ LOG.info("QueueFeeder interrupted, exception:", e);
}
}
}