Author: lewismc
Date: Fri May 8 04:25:05 2015
New Revision: 1678281
URL: http://svn.apache.org/r1678281
Log:
NUTCH-1934 Refactor Fetcher in trunk
Added:
nutch/trunk/src/java/org/apache/nutch/fetcher/FetchItem.java
nutch/trunk/src/java/org/apache/nutch/fetcher/FetchItemQueue.java
nutch/trunk/src/java/org/apache/nutch/fetcher/FetchItemQueues.java
nutch/trunk/src/java/org/apache/nutch/fetcher/FetcherThread.java
nutch/trunk/src/java/org/apache/nutch/fetcher/QueueFeeder.java
Modified:
nutch/trunk/CHANGES.txt
nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java
Modified: nutch/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/nutch/trunk/CHANGES.txt?rev=1678281&r1=1678280&r2=1678281&view=diff
==============================================================================
--- nutch/trunk/CHANGES.txt (original)
+++ nutch/trunk/CHANGES.txt Fri May 8 04:25:05 2015
@@ -2,6 +2,8 @@ Nutch Change Log
Nutch Current Development 1.11-SNAPSHOT
+* NUTCH-1934 Refactor Fetcher in trunk (lewismc)
+
* NUTCH-2004 ParseChecker does not handle redirects (mjoyce via lewismc)
Nutch 1.10 Release - 29/04/2015 (dd/mm/yyyy)
Added: nutch/trunk/src/java/org/apache/nutch/fetcher/FetchItem.java
URL:
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/fetcher/FetchItem.java?rev=1678281&view=auto
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/fetcher/FetchItem.java (added)
+++ nutch/trunk/src/java/org/apache/nutch/fetcher/FetchItem.java Fri May 8
04:25:05 2015
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.fetcher;
+
+import java.net.InetAddress;
+import java.net.URL;
+import java.net.UnknownHostException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.util.URLUtil;
+import org.slf4j.LoggerFactory;
+import org.slf4j.Logger;
+
+/**
+ * This class described the item to be fetched.
+ */
+public class FetchItem {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FetchItem.class);
+
+ int outlinkDepth = 0;
+ String queueID;
+ Text url;
+ URL u;
+ CrawlDatum datum;
+
+ public FetchItem(Text url, URL u, CrawlDatum datum, String queueID) {
+ this(url, u, datum, queueID, 0);
+ }
+
+ public FetchItem(Text url, URL u, CrawlDatum datum, String queueID,
+ int outlinkDepth) {
+ this.url = url;
+ this.u = u;
+ this.datum = datum;
+ this.queueID = queueID;
+ this.outlinkDepth = outlinkDepth;
+ }
+
+ /**
+ * Create an item. Queue id will be created based on <code>queueMode</code>
+ * argument, either as a protocol + hostname pair, protocol + IP address
+ * pair or protocol+domain pair.
+ */
+ public static FetchItem create(Text url, CrawlDatum datum, String queueMode)
{
+ return create(url, datum, queueMode, 0);
+ }
+
+ public static FetchItem create(Text url, CrawlDatum datum,
+ String queueMode, int outlinkDepth) {
+ String queueID;
+ URL u = null;
+ try {
+ u = new URL(url.toString());
+ } catch (Exception e) {
+ LOG.warn("Cannot parse url: " + url, e);
+ return null;
+ }
+ final String proto = u.getProtocol().toLowerCase();
+ String key;
+ if (FetchItemQueues.QUEUE_MODE_IP.equalsIgnoreCase(queueMode)) {
+ try {
+ final InetAddress addr = InetAddress.getByName(u.getHost());
+ key = addr.getHostAddress();
+ } catch (final UnknownHostException e) {
+ // unable to resolve it, so don't fall back to host name
+ LOG.warn("Unable to resolve: " + u.getHost() + ", skipping.");
+ return null;
+ }
+ } else if (FetchItemQueues.QUEUE_MODE_DOMAIN.equalsIgnoreCase(queueMode)) {
+ key = URLUtil.getDomainName(u);
+ if (key == null) {
+ LOG.warn("Unknown domain for url: " + url
+ + ", using URL string as key");
+ key = u.toExternalForm();
+ }
+ } else {
+ key = u.getHost();
+ if (key == null) {
+ LOG.warn("Unknown host for url: " + url + ", using URL string as key");
+ key = u.toExternalForm();
+ }
+ }
+ queueID = proto + "://" + key.toLowerCase();
+ return new FetchItem(url, u, datum, queueID, outlinkDepth);
+ }
+
+ public CrawlDatum getDatum() {
+ return datum;
+ }
+
+ public String getQueueID() {
+ return queueID;
+ }
+
+ public Text getUrl() {
+ return url;
+ }
+
+ public URL getURL2() {
+ return u;
+ }
+}
Added: nutch/trunk/src/java/org/apache/nutch/fetcher/FetchItemQueue.java
URL:
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/fetcher/FetchItemQueue.java?rev=1678281&view=auto
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/fetcher/FetchItemQueue.java (added)
+++ nutch/trunk/src/java/org/apache/nutch/fetcher/FetchItemQueue.java Fri May
8 04:25:05 2015
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.fetcher;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class handles FetchItems which come from the same host ID (be it a
+ * proto/hostname or proto/IP pair). It also keeps track of requests in
+ * progress and elapsed time between requests.
+ */
+public class FetchItemQueue {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FetchItemQueues.class);
+
+ List<FetchItem> queue = Collections
+ .synchronizedList(new LinkedList<FetchItem>());
+ AtomicInteger inProgress = new AtomicInteger();
+ AtomicLong nextFetchTime = new AtomicLong();
+ AtomicInteger exceptionCounter = new AtomicInteger();
+ long crawlDelay;
+ long minCrawlDelay;
+ int maxThreads;
+ Configuration conf;
+
+ public FetchItemQueue(Configuration conf, int maxThreads, long crawlDelay,
+ long minCrawlDelay) {
+ this.conf = conf;
+ this.maxThreads = maxThreads;
+ this.crawlDelay = crawlDelay;
+ this.minCrawlDelay = minCrawlDelay;
+ // ready to start
+ setEndTime(System.currentTimeMillis() - crawlDelay);
+ }
+
+ public synchronized int emptyQueue() {
+ int presize = queue.size();
+ queue.clear();
+ return presize;
+ }
+
+ public int getQueueSize() {
+ return queue.size();
+ }
+
+ public int getInProgressSize() {
+ return inProgress.get();
+ }
+
+ public int incrementExceptionCounter() {
+ return exceptionCounter.incrementAndGet();
+ }
+
+ public void finishFetchItem(FetchItem it, boolean asap) {
+ if (it != null) {
+ inProgress.decrementAndGet();
+ setEndTime(System.currentTimeMillis(), asap);
+ }
+ }
+
+ public void addFetchItem(FetchItem it) {
+ if (it == null)
+ return;
+ queue.add(it);
+ }
+
+ public void addInProgressFetchItem(FetchItem it) {
+ if (it == null)
+ return;
+ inProgress.incrementAndGet();
+ }
+
+ public FetchItem getFetchItem() {
+ if (inProgress.get() >= maxThreads)
+ return null;
+ long now = System.currentTimeMillis();
+ if (nextFetchTime.get() > now)
+ return null;
+ FetchItem it = null;
+ if (queue.size() == 0)
+ return null;
+ try {
+ it = queue.remove(0);
+ inProgress.incrementAndGet();
+ } catch (Exception e) {
+ LOG.error(
+ "Cannot remove FetchItem from queue or cannot add it to inProgress
queue",
+ e);
+ }
+ return it;
+ }
+
+ public synchronized void dump() {
+ LOG.info(" maxThreads = " + maxThreads);
+ LOG.info(" inProgress = " + inProgress.get());
+ LOG.info(" crawlDelay = " + crawlDelay);
+ LOG.info(" minCrawlDelay = " + minCrawlDelay);
+ LOG.info(" nextFetchTime = " + nextFetchTime.get());
+ LOG.info(" now = " + System.currentTimeMillis());
+ for (int i = 0; i < queue.size(); i++) {
+ FetchItem it = queue.get(i);
+ LOG.info(" " + i + ". " + it.url);
+ }
+ }
+
+ private void setEndTime(long endTime) {
+ setEndTime(endTime, false);
+ }
+
+ private void setEndTime(long endTime, boolean asap) {
+ if (!asap)
+ nextFetchTime.set(endTime
+ + (maxThreads > 1 ? minCrawlDelay : crawlDelay));
+ else
+ nextFetchTime.set(endTime);
+ }
+}
Added: nutch/trunk/src/java/org/apache/nutch/fetcher/FetchItemQueues.java
URL:
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/fetcher/FetchItemQueues.java?rev=1678281&view=auto
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/fetcher/FetchItemQueues.java (added)
+++ nutch/trunk/src/java/org/apache/nutch/fetcher/FetchItemQueues.java Fri May
8 04:25:05 2015
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.fetcher;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Convenience class - a collection of queues that keeps track of the total
+ * number of items, and provides items eligible for fetching from any queue.
+ */
+public class FetchItemQueues {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FetchItemQueues.class);
+
+ public static final String DEFAULT_ID = "default";
+ Map<String, FetchItemQueue> queues = new HashMap<String, FetchItemQueue>();
+ AtomicInteger totalSize = new AtomicInteger(0);
+ int maxThreads;
+ long crawlDelay;
+ long minCrawlDelay;
+ long timelimit = -1;
+ int maxExceptionsPerQueue = -1;
+ Configuration conf;
+
+ public static final String QUEUE_MODE_HOST = "byHost";
+ public static final String QUEUE_MODE_DOMAIN = "byDomain";
+ public static final String QUEUE_MODE_IP = "byIP";
+
+ String queueMode;
+
+ public FetchItemQueues(Configuration conf) {
+ this.conf = conf;
+ this.maxThreads = conf.getInt("fetcher.threads.per.queue", 1);
+ queueMode = conf.get("fetcher.queue.mode", QUEUE_MODE_HOST);
+ // check that the mode is known
+ if (!queueMode.equals(QUEUE_MODE_IP)
+ && !queueMode.equals(QUEUE_MODE_DOMAIN)
+ && !queueMode.equals(QUEUE_MODE_HOST)) {
+ LOG.error("Unknown partition mode : " + queueMode
+ + " - forcing to byHost");
+ queueMode = QUEUE_MODE_HOST;
+ }
+ LOG.info("Using queue mode : " + queueMode);
+
+ this.crawlDelay = (long) (conf.getFloat("fetcher.server.delay", 1.0f) *
1000);
+ this.minCrawlDelay = (long) (conf.getFloat("fetcher.server.min.delay",
+ 0.0f) * 1000);
+ this.timelimit = conf.getLong("fetcher.timelimit", -1);
+ this.maxExceptionsPerQueue = conf.getInt(
+ "fetcher.max.exceptions.per.queue", -1);
+ }
+
+ public int getTotalSize() {
+ return totalSize.get();
+ }
+
+ public int getQueueCount() {
+ return queues.size();
+ }
+
+ public void addFetchItem(Text url, CrawlDatum datum) {
+ FetchItem it = FetchItem.create(url, datum, queueMode);
+ if (it != null)
+ addFetchItem(it);
+ }
+
+ public synchronized void addFetchItem(FetchItem it) {
+ FetchItemQueue fiq = getFetchItemQueue(it.queueID);
+ fiq.addFetchItem(it);
+ totalSize.incrementAndGet();
+ }
+
+ public void finishFetchItem(FetchItem it) {
+ finishFetchItem(it, false);
+ }
+
+ public void finishFetchItem(FetchItem it, boolean asap) {
+ FetchItemQueue fiq = queues.get(it.queueID);
+ if (fiq == null) {
+ LOG.warn("Attempting to finish item from unknown queue: " + it);
+ return;
+ }
+ fiq.finishFetchItem(it, asap);
+ }
+
+ public synchronized FetchItemQueue getFetchItemQueue(String id) {
+ FetchItemQueue fiq = queues.get(id);
+ if (fiq == null) {
+ // initialize queue
+ fiq = new FetchItemQueue(conf, maxThreads, crawlDelay, minCrawlDelay);
+ queues.put(id, fiq);
+ }
+ return fiq;
+ }
+
+ public synchronized FetchItem getFetchItem() {
+ Iterator<Map.Entry<String, FetchItemQueue>> it = queues.entrySet()
+ .iterator();
+ while (it.hasNext()) {
+ FetchItemQueue fiq = it.next().getValue();
+ // reap empty queues
+ if (fiq.getQueueSize() == 0 && fiq.getInProgressSize() == 0) {
+ it.remove();
+ continue;
+ }
+ FetchItem fit = fiq.getFetchItem();
+ if (fit != null) {
+ totalSize.decrementAndGet();
+ return fit;
+ }
+ }
+ return null;
+ }
+
+ // called only once the feeder has stopped
+ public synchronized int checkTimelimit() {
+ int count = 0;
+
+ if (System.currentTimeMillis() >= timelimit && timelimit != -1) {
+ // emptying the queues
+ count = emptyQueues();
+
+ // there might also be a case where totalsize !=0 but number of queues
+ // == 0
+ // in which case we simply force it to 0 to avoid blocking
+ if (totalSize.get() != 0 && queues.size() == 0)
+ totalSize.set(0);
+ }
+ return count;
+ }
+
+ // empties the queues (used by timebomb and throughput threshold)
+ public synchronized int emptyQueues() {
+ int count = 0;
+
+ for (String id : queues.keySet()) {
+ FetchItemQueue fiq = queues.get(id);
+ if (fiq.getQueueSize() == 0)
+ continue;
+ LOG.info("* queue: " + id + " >> dropping! ");
+ int deleted = fiq.emptyQueue();
+ for (int i = 0; i < deleted; i++) {
+ totalSize.decrementAndGet();
+ }
+ count += deleted;
+ }
+
+ return count;
+ }
+
+ /**
+ * Increment the exception counter of a queue in case of an exception e.g.
+ * timeout; when higher than a given threshold simply empty the queue.
+ *
+ * @param queueid
+ * @return number of purged items
+ */
+ public synchronized int checkExceptionThreshold(String queueid) {
+ FetchItemQueue fiq = queues.get(queueid);
+ if (fiq == null) {
+ return 0;
+ }
+ if (fiq.getQueueSize() == 0) {
+ return 0;
+ }
+ int excCount = fiq.incrementExceptionCounter();
+ if (maxExceptionsPerQueue != -1 && excCount >= maxExceptionsPerQueue) {
+ // too many exceptions for items in this queue - purge it
+ int deleted = fiq.emptyQueue();
+ LOG.info("* queue: " + queueid + " >> removed " + deleted
+ + " URLs from queue because " + excCount + " exceptions occurred");
+ for (int i = 0; i < deleted; i++) {
+ totalSize.decrementAndGet();
+ }
+ return deleted;
+ }
+ return 0;
+ }
+
+ public synchronized void dump() {
+ for (String id : queues.keySet()) {
+ FetchItemQueue fiq = queues.get(id);
+ if (fiq.getQueueSize() == 0)
+ continue;
+ LOG.info("* queue: " + id);
+ fiq.dump();
+ }
+ }
+}
Modified: nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java
URL:
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java?rev=1678281&r1=1678280&r2=1678281&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java Fri May 8
04:25:05 2015
@@ -18,28 +18,14 @@ package org.apache.nutch.fetcher;
import java.io.File;
import java.io.IOException;
-import java.net.InetAddress;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.UnknownHostException;
import java.text.SimpleDateFormat;
import java.util.*;
-import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-
-
-
-
-
-
-
-
-
-// Slf4j Logging imports
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.hadoop.io.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.*;
@@ -49,18 +35,10 @@ import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.nutch.crawl.CrawlDatum;
import org.apache.nutch.crawl.NutchWritable;
-import org.apache.nutch.crawl.SignatureFactory;
-import org.apache.nutch.metadata.Metadata;
import org.apache.nutch.metadata.Nutch;
-import org.apache.nutch.net.*;
import org.apache.nutch.protocol.*;
-import org.apache.nutch.parse.*;
-import org.apache.nutch.scoring.ScoringFilterException;
-import org.apache.nutch.scoring.ScoringFilters;
import org.apache.nutch.util.*;
-import crawlercommons.robots.BaseRobotRules;
-
/**
* A queue-based fetcher.
*
@@ -146,1058 +124,6 @@ public class Fetcher extends NutchTool i
LinkedList<FetcherThread> fetcherThreads = new LinkedList<FetcherThread>();
- /**
- * This class described the item to be fetched.
- */
- private static class FetchItem {
- int outlinkDepth = 0;
- String queueID;
- Text url;
- URL u;
- CrawlDatum datum;
-
- public FetchItem(Text url, URL u, CrawlDatum datum, String queueID) {
- this(url, u, datum, queueID, 0);
- }
-
- public FetchItem(Text url, URL u, CrawlDatum datum, String queueID,
- int outlinkDepth) {
- this.url = url;
- this.u = u;
- this.datum = datum;
- this.queueID = queueID;
- this.outlinkDepth = outlinkDepth;
- }
-
- /**
- * Create an item. Queue id will be created based on <code>queueMode</code>
- * argument, either as a protocol + hostname pair, protocol + IP address
- * pair or protocol+domain pair.
- */
- public static FetchItem create(Text url, CrawlDatum datum, String
queueMode) {
- return create(url, datum, queueMode, 0);
- }
-
- public static FetchItem create(Text url, CrawlDatum datum,
- String queueMode, int outlinkDepth) {
- String queueID;
- URL u = null;
- try {
- u = new URL(url.toString());
- } catch (Exception e) {
- LOG.warn("Cannot parse url: " + url, e);
- return null;
- }
- final String proto = u.getProtocol().toLowerCase();
- String key;
- if (FetchItemQueues.QUEUE_MODE_IP.equalsIgnoreCase(queueMode)) {
- try {
- final InetAddress addr = InetAddress.getByName(u.getHost());
- key = addr.getHostAddress();
- } catch (final UnknownHostException e) {
- // unable to resolve it, so don't fall back to host name
- LOG.warn("Unable to resolve: " + u.getHost() + ", skipping.");
- return null;
- }
- } else if
(FetchItemQueues.QUEUE_MODE_DOMAIN.equalsIgnoreCase(queueMode)) {
- key = URLUtil.getDomainName(u);
- if (key == null) {
- LOG.warn("Unknown domain for url: " + url
- + ", using URL string as key");
- key = u.toExternalForm();
- }
- } else {
- key = u.getHost();
- if (key == null) {
- LOG.warn("Unknown host for url: " + url + ", using URL string as
key");
- key = u.toExternalForm();
- }
- }
- queueID = proto + "://" + key.toLowerCase();
- return new FetchItem(url, u, datum, queueID, outlinkDepth);
- }
-
- public CrawlDatum getDatum() {
- return datum;
- }
-
- public String getQueueID() {
- return queueID;
- }
-
- public Text getUrl() {
- return url;
- }
-
- public URL getURL2() {
- return u;
- }
- }
-
- /**
- * This class handles FetchItems which come from the same host ID (be it a
- * proto/hostname or proto/IP pair). It also keeps track of requests in
- * progress and elapsed time between requests.
- */
- private static class FetchItemQueue {
- List<FetchItem> queue = Collections
- .synchronizedList(new LinkedList<FetchItem>());
- AtomicInteger inProgress = new AtomicInteger();
- AtomicLong nextFetchTime = new AtomicLong();
- AtomicInteger exceptionCounter = new AtomicInteger();
- long crawlDelay;
- long minCrawlDelay;
- int maxThreads;
- Configuration conf;
-
- public FetchItemQueue(Configuration conf, int maxThreads, long crawlDelay,
- long minCrawlDelay) {
- this.conf = conf;
- this.maxThreads = maxThreads;
- this.crawlDelay = crawlDelay;
- this.minCrawlDelay = minCrawlDelay;
- // ready to start
- setEndTime(System.currentTimeMillis() - crawlDelay);
- }
-
- public synchronized int emptyQueue() {
- int presize = queue.size();
- queue.clear();
- return presize;
- }
-
- public int getQueueSize() {
- return queue.size();
- }
-
- public int getInProgressSize() {
- return inProgress.get();
- }
-
- public int incrementExceptionCounter() {
- return exceptionCounter.incrementAndGet();
- }
-
- public void finishFetchItem(FetchItem it, boolean asap) {
- if (it != null) {
- inProgress.decrementAndGet();
- setEndTime(System.currentTimeMillis(), asap);
- }
- }
-
- public void addFetchItem(FetchItem it) {
- if (it == null)
- return;
- queue.add(it);
- }
-
- public void addInProgressFetchItem(FetchItem it) {
- if (it == null)
- return;
- inProgress.incrementAndGet();
- }
-
- public FetchItem getFetchItem() {
- if (inProgress.get() >= maxThreads)
- return null;
- long now = System.currentTimeMillis();
- if (nextFetchTime.get() > now)
- return null;
- FetchItem it = null;
- if (queue.size() == 0)
- return null;
- try {
- it = queue.remove(0);
- inProgress.incrementAndGet();
- } catch (Exception e) {
- LOG.error(
- "Cannot remove FetchItem from queue or cannot add it to inProgress
queue",
- e);
- }
- return it;
- }
-
- public synchronized void dump() {
- LOG.info(" maxThreads = " + maxThreads);
- LOG.info(" inProgress = " + inProgress.get());
- LOG.info(" crawlDelay = " + crawlDelay);
- LOG.info(" minCrawlDelay = " + minCrawlDelay);
- LOG.info(" nextFetchTime = " + nextFetchTime.get());
- LOG.info(" now = " + System.currentTimeMillis());
- for (int i = 0; i < queue.size(); i++) {
- FetchItem it = queue.get(i);
- LOG.info(" " + i + ". " + it.url);
- }
- }
-
- private void setEndTime(long endTime) {
- setEndTime(endTime, false);
- }
-
- private void setEndTime(long endTime, boolean asap) {
- if (!asap)
- nextFetchTime.set(endTime
- + (maxThreads > 1 ? minCrawlDelay : crawlDelay));
- else
- nextFetchTime.set(endTime);
- }
- }
-
- /**
- * Convenience class - a collection of queues that keeps track of the total
- * number of items, and provides items eligible for fetching from any queue.
- */
- private static class FetchItemQueues {
- public static final String DEFAULT_ID = "default";
- Map<String, FetchItemQueue> queues = new HashMap<String, FetchItemQueue>();
- AtomicInteger totalSize = new AtomicInteger(0);
- int maxThreads;
- long crawlDelay;
- long minCrawlDelay;
- long timelimit = -1;
- int maxExceptionsPerQueue = -1;
- Configuration conf;
-
- public static final String QUEUE_MODE_HOST = "byHost";
- public static final String QUEUE_MODE_DOMAIN = "byDomain";
- public static final String QUEUE_MODE_IP = "byIP";
-
- String queueMode;
-
- public FetchItemQueues(Configuration conf) {
- this.conf = conf;
- this.maxThreads = conf.getInt("fetcher.threads.per.queue", 1);
- queueMode = conf.get("fetcher.queue.mode", QUEUE_MODE_HOST);
- // check that the mode is known
- if (!queueMode.equals(QUEUE_MODE_IP)
- && !queueMode.equals(QUEUE_MODE_DOMAIN)
- && !queueMode.equals(QUEUE_MODE_HOST)) {
- LOG.error("Unknown partition mode : " + queueMode
- + " - forcing to byHost");
- queueMode = QUEUE_MODE_HOST;
- }
- LOG.info("Using queue mode : " + queueMode);
-
- this.crawlDelay = (long) (conf.getFloat("fetcher.server.delay", 1.0f) *
1000);
- this.minCrawlDelay = (long) (conf.getFloat("fetcher.server.min.delay",
- 0.0f) * 1000);
- this.timelimit = conf.getLong("fetcher.timelimit", -1);
- this.maxExceptionsPerQueue = conf.getInt(
- "fetcher.max.exceptions.per.queue", -1);
- }
-
- public int getTotalSize() {
- return totalSize.get();
- }
-
- public int getQueueCount() {
- return queues.size();
- }
-
- public void addFetchItem(Text url, CrawlDatum datum) {
- FetchItem it = FetchItem.create(url, datum, queueMode);
- if (it != null)
- addFetchItem(it);
- }
-
- public synchronized void addFetchItem(FetchItem it) {
- FetchItemQueue fiq = getFetchItemQueue(it.queueID);
- fiq.addFetchItem(it);
- totalSize.incrementAndGet();
- }
-
- public void finishFetchItem(FetchItem it) {
- finishFetchItem(it, false);
- }
-
- public void finishFetchItem(FetchItem it, boolean asap) {
- FetchItemQueue fiq = queues.get(it.queueID);
- if (fiq == null) {
- LOG.warn("Attempting to finish item from unknown queue: " + it);
- return;
- }
- fiq.finishFetchItem(it, asap);
- }
-
- public synchronized FetchItemQueue getFetchItemQueue(String id) {
- FetchItemQueue fiq = queues.get(id);
- if (fiq == null) {
- // initialize queue
- fiq = new FetchItemQueue(conf, maxThreads, crawlDelay, minCrawlDelay);
- queues.put(id, fiq);
- }
- return fiq;
- }
-
- public synchronized FetchItem getFetchItem() {
- Iterator<Map.Entry<String, FetchItemQueue>> it = queues.entrySet()
- .iterator();
- while (it.hasNext()) {
- FetchItemQueue fiq = it.next().getValue();
- // reap empty queues
- if (fiq.getQueueSize() == 0 && fiq.getInProgressSize() == 0) {
- it.remove();
- continue;
- }
- FetchItem fit = fiq.getFetchItem();
- if (fit != null) {
- totalSize.decrementAndGet();
- return fit;
- }
- }
- return null;
- }
-
- // called only once the feeder has stopped
- public synchronized int checkTimelimit() {
- int count = 0;
-
- if (System.currentTimeMillis() >= timelimit && timelimit != -1) {
- // emptying the queues
- count = emptyQueues();
-
- // there might also be a case where totalsize !=0 but number of queues
- // == 0
- // in which case we simply force it to 0 to avoid blocking
- if (totalSize.get() != 0 && queues.size() == 0)
- totalSize.set(0);
- }
- return count;
- }
-
- // empties the queues (used by timebomb and throughput threshold)
- public synchronized int emptyQueues() {
- int count = 0;
-
- for (String id : queues.keySet()) {
- FetchItemQueue fiq = queues.get(id);
- if (fiq.getQueueSize() == 0)
- continue;
- LOG.info("* queue: " + id + " >> dropping! ");
- int deleted = fiq.emptyQueue();
- for (int i = 0; i < deleted; i++) {
- totalSize.decrementAndGet();
- }
- count += deleted;
- }
-
- return count;
- }
-
- /**
- * Increment the exception counter of a queue in case of an exception e.g.
- * timeout; when higher than a given threshold simply empty the queue.
- *
- * @param queueid
- * @return number of purged items
- */
- public synchronized int checkExceptionThreshold(String queueid) {
- FetchItemQueue fiq = queues.get(queueid);
- if (fiq == null) {
- return 0;
- }
- if (fiq.getQueueSize() == 0) {
- return 0;
- }
- int excCount = fiq.incrementExceptionCounter();
- if (maxExceptionsPerQueue != -1 && excCount >= maxExceptionsPerQueue) {
- // too many exceptions for items in this queue - purge it
- int deleted = fiq.emptyQueue();
- LOG.info("* queue: " + queueid + " >> removed " + deleted
- + " URLs from queue because " + excCount + " exceptions occurred");
- for (int i = 0; i < deleted; i++) {
- totalSize.decrementAndGet();
- }
- return deleted;
- }
- return 0;
- }
-
- public synchronized void dump() {
- for (String id : queues.keySet()) {
- FetchItemQueue fiq = queues.get(id);
- if (fiq.getQueueSize() == 0)
- continue;
- LOG.info("* queue: " + id);
- fiq.dump();
- }
- }
- }
-
- /**
- * This class feeds the queues with input items, and re-fills them as items
- * are consumed by FetcherThread-s.
- */
- private static class QueueFeeder extends Thread {
- private RecordReader<Text, CrawlDatum> reader;
- private FetchItemQueues queues;
- private int size;
- private long timelimit = -1;
-
- public QueueFeeder(RecordReader<Text, CrawlDatum> reader,
- FetchItemQueues queues, int size) {
- this.reader = reader;
- this.queues = queues;
- this.size = size;
- this.setDaemon(true);
- this.setName("QueueFeeder");
- }
-
- public void setTimeLimit(long tl) {
- timelimit = tl;
- }
-
- public void run() {
- boolean hasMore = true;
- int cnt = 0;
- int timelimitcount = 0;
- while (hasMore) {
- if (System.currentTimeMillis() >= timelimit && timelimit != -1) {
- // enough .. lets' simply
- // read all the entries from the input without processing them
- try {
- Text url = new Text();
- CrawlDatum datum = new CrawlDatum();
- hasMore = reader.next(url, datum);
- timelimitcount++;
- } catch (IOException e) {
- LOG.error("QueueFeeder error reading input, record " + cnt, e);
- return;
- }
- continue;
- }
- int feed = size - queues.getTotalSize();
- if (feed <= 0) {
- // queues are full - spin-wait until they have some free space
- try {
- Thread.sleep(1000);
- } catch (Exception e) {
- }
- ;
- continue;
- } else {
- LOG.debug("-feeding " + feed + " input urls ...");
- while (feed > 0 && hasMore) {
- try {
- Text url = new Text();
- CrawlDatum datum = new CrawlDatum();
- hasMore = reader.next(url, datum);
- if (hasMore) {
- queues.addFetchItem(url, datum);
- cnt++;
- feed--;
- }
- } catch (IOException e) {
- LOG.error("QueueFeeder error reading input, record " + cnt, e);
- return;
- }
- }
- }
- }
- LOG.info("QueueFeeder finished: total " + cnt
- + " records + hit by time limit :" + timelimitcount);
- }
- }
-
- /**
- * This class picks items from queues and fetches the pages.
- */
- private class FetcherThread extends Thread {
- private Configuration conf;
- private URLFilters urlFilters;
- private ScoringFilters scfilters;
- private ParseUtil parseUtil;
- private URLNormalizers normalizers;
- private ProtocolFactory protocolFactory;
- private long maxCrawlDelay;
- private String queueMode;
- private int maxRedirect;
- private String reprUrl;
- private boolean redirecting;
- private int redirectCount;
- private boolean ignoreExternalLinks;
-
- // Used by fetcher.follow.outlinks.depth in parse
- private int maxOutlinksPerPage;
- private final int maxOutlinks;
- private final int interval;
- private int maxOutlinkDepth;
- private int maxOutlinkDepthNumLinks;
- private boolean outlinksIgnoreExternal;
-
- private int outlinksDepthDivisor;
- private boolean skipTruncated;
-
- private boolean halted = false;
-
- public FetcherThread(Configuration conf) {
- this.setDaemon(true); // don't hang JVM on exit
- this.setName("FetcherThread"); // use an informative name
- this.conf = conf;
- this.urlFilters = new URLFilters(conf);
- this.scfilters = new ScoringFilters(conf);
- this.parseUtil = new ParseUtil(conf);
- this.skipTruncated = conf.getBoolean(ParseSegment.SKIP_TRUNCATED, true);
- this.protocolFactory = new ProtocolFactory(conf);
- this.normalizers = new URLNormalizers(conf,
URLNormalizers.SCOPE_FETCHER);
- this.maxCrawlDelay = conf.getInt("fetcher.max.crawl.delay", 30) * 1000;
- queueMode = conf.get("fetcher.queue.mode",
- FetchItemQueues.QUEUE_MODE_HOST);
- // check that the mode is known
- if (!queueMode.equals(FetchItemQueues.QUEUE_MODE_IP)
- && !queueMode.equals(FetchItemQueues.QUEUE_MODE_DOMAIN)
- && !queueMode.equals(FetchItemQueues.QUEUE_MODE_HOST)) {
- LOG.error("Unknown partition mode : " + queueMode
- + " - forcing to byHost");
- queueMode = FetchItemQueues.QUEUE_MODE_HOST;
- }
- LOG.info("Using queue mode : " + queueMode);
- this.maxRedirect = conf.getInt("http.redirect.max", 3);
- this.ignoreExternalLinks = conf.getBoolean("db.ignore.external.links",
- false);
-
- maxOutlinksPerPage = conf.getInt("db.max.outlinks.per.page", 100);
- maxOutlinks = (maxOutlinksPerPage < 0) ? Integer.MAX_VALUE
- : maxOutlinksPerPage;
- interval = conf.getInt("db.fetch.interval.default", 2592000);
- ignoreExternalLinks = conf.getBoolean("db.ignore.external.links", false);
- maxOutlinkDepth = conf.getInt("fetcher.follow.outlinks.depth", -1);
- outlinksIgnoreExternal = conf.getBoolean(
- "fetcher.follow.outlinks.ignore.external", false);
- maxOutlinkDepthNumLinks = conf.getInt(
- "fetcher.follow.outlinks.num.links", 4);
- outlinksDepthDivisor = conf.getInt(
- "fetcher.follow.outlinks.depth.divisor", 2);
- }
-
- @SuppressWarnings("fallthrough")
- public void run() {
- activeThreads.incrementAndGet(); // count threads
-
- FetchItem fit = null;
- try {
-
- while (true) {
- // check whether must be stopped
- if (isHalted()) {
- LOG.debug(getName() + " set to halted");
- fit = null;
- return;
- }
-
- fit = fetchQueues.getFetchItem();
- if (fit == null) {
- if (feeder.isAlive() || fetchQueues.getTotalSize() > 0) {
- LOG.debug(getName() + " spin-waiting ...");
- // spin-wait.
- spinWaiting.incrementAndGet();
- try {
- Thread.sleep(500);
- } catch (Exception e) {
- }
- spinWaiting.decrementAndGet();
- continue;
- } else {
- // all done, finish this thread
- LOG.info("Thread " + getName() + " has no more work available");
- return;
- }
- }
- lastRequestStart.set(System.currentTimeMillis());
- Text reprUrlWritable = (Text) fit.datum.getMetaData().get(
- Nutch.WRITABLE_REPR_URL_KEY);
- if (reprUrlWritable == null) {
- reprUrl = fit.url.toString();
- } else {
- reprUrl = reprUrlWritable.toString();
- }
- try {
- // fetch the page
- redirecting = false;
- redirectCount = 0;
- do {
- if (LOG.isInfoEnabled()) {
- LOG.info("fetching " + fit.url + " (queue crawl delay="
- + fetchQueues.getFetchItemQueue(fit.queueID).crawlDelay
- + "ms)");
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("redirectCount=" + redirectCount);
- }
- redirecting = false;
- Protocol protocol = this.protocolFactory.getProtocol(fit.url
- .toString());
- BaseRobotRules rules = protocol.getRobotRules(fit.url,
fit.datum);
- if (!rules.isAllowed(fit.u.toString())) {
- // unblock
- fetchQueues.finishFetchItem(fit, true);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Denied by robots.txt: " + fit.url);
- }
- output(fit.url, fit.datum, null,
- ProtocolStatus.STATUS_ROBOTS_DENIED,
- CrawlDatum.STATUS_FETCH_GONE);
- reporter.incrCounter("FetcherStatus", "robots_denied", 1);
- continue;
- }
- if (rules.getCrawlDelay() > 0) {
- if (rules.getCrawlDelay() > maxCrawlDelay && maxCrawlDelay >=
0) {
- // unblock
- fetchQueues.finishFetchItem(fit, true);
- LOG.debug("Crawl-Delay for " + fit.url + " too long ("
- + rules.getCrawlDelay() + "), skipping");
- output(fit.url, fit.datum, null,
- ProtocolStatus.STATUS_ROBOTS_DENIED,
- CrawlDatum.STATUS_FETCH_GONE);
- reporter.incrCounter("FetcherStatus",
- "robots_denied_maxcrawldelay", 1);
- continue;
- } else {
- FetchItemQueue fiq = fetchQueues
- .getFetchItemQueue(fit.queueID);
- fiq.crawlDelay = rules.getCrawlDelay();
- if (LOG.isDebugEnabled()) {
- LOG.info("Crawl delay for queue: " + fit.queueID
- + " is set to " + fiq.crawlDelay
- + " as per robots.txt. url: " + fit.url);
- }
- }
- }
- ProtocolOutput output = protocol.getProtocolOutput(fit.url,
- fit.datum);
- ProtocolStatus status = output.getStatus();
- Content content = output.getContent();
- ParseStatus pstatus = null;
- // unblock queue
- fetchQueues.finishFetchItem(fit);
-
- String urlString = fit.url.toString();
-
- reporter.incrCounter("FetcherStatus", status.getName(), 1);
-
- switch (status.getCode()) {
-
- case ProtocolStatus.WOULDBLOCK:
- // retry ?
- fetchQueues.addFetchItem(fit);
- break;
-
- case ProtocolStatus.SUCCESS: // got a page
- pstatus = output(fit.url, fit.datum, content, status,
- CrawlDatum.STATUS_FETCH_SUCCESS, fit.outlinkDepth);
- updateStatus(content.getContent().length);
- if (pstatus != null && pstatus.isSuccess()
- && pstatus.getMinorCode() == ParseStatus.SUCCESS_REDIRECT)
{
- String newUrl = pstatus.getMessage();
- int refreshTime = Integer.valueOf(pstatus.getArgs()[1]);
- Text redirUrl = handleRedirect(fit.url, fit.datum, urlString,
- newUrl, refreshTime < Fetcher.PERM_REFRESH_TIME,
- Fetcher.CONTENT_REDIR);
- if (redirUrl != null) {
- fit = queueRedirect(redirUrl, fit);
- }
- }
- break;
-
- case ProtocolStatus.MOVED: // redirect
- case ProtocolStatus.TEMP_MOVED:
- int code;
- boolean temp;
- if (status.getCode() == ProtocolStatus.MOVED) {
- code = CrawlDatum.STATUS_FETCH_REDIR_PERM;
- temp = false;
- } else {
- code = CrawlDatum.STATUS_FETCH_REDIR_TEMP;
- temp = true;
- }
- output(fit.url, fit.datum, content, status, code);
- String newUrl = status.getMessage();
- Text redirUrl = handleRedirect(fit.url, fit.datum, urlString,
- newUrl, temp, Fetcher.PROTOCOL_REDIR);
- if (redirUrl != null) {
- fit = queueRedirect(redirUrl, fit);
- } else {
- // stop redirecting
- redirecting = false;
- }
- break;
-
- case ProtocolStatus.EXCEPTION:
- logError(fit.url, status.getMessage());
- int killedURLs = fetchQueues.checkExceptionThreshold(fit
- .getQueueID());
- if (killedURLs != 0)
- reporter.incrCounter("FetcherStatus",
- "AboveExceptionThresholdInQueue", killedURLs);
- /* FALLTHROUGH */
- case ProtocolStatus.RETRY: // retry
- case ProtocolStatus.BLOCKED:
- output(fit.url, fit.datum, null, status,
- CrawlDatum.STATUS_FETCH_RETRY);
- break;
-
- case ProtocolStatus.GONE: // gone
- case ProtocolStatus.NOTFOUND:
- case ProtocolStatus.ACCESS_DENIED:
- case ProtocolStatus.ROBOTS_DENIED:
- output(fit.url, fit.datum, null, status,
- CrawlDatum.STATUS_FETCH_GONE);
- break;
-
- case ProtocolStatus.NOTMODIFIED:
- output(fit.url, fit.datum, null, status,
- CrawlDatum.STATUS_FETCH_NOTMODIFIED);
- break;
-
- default:
- if (LOG.isWarnEnabled()) {
- LOG.warn("Unknown ProtocolStatus: " + status.getCode());
- }
- output(fit.url, fit.datum, null, status,
- CrawlDatum.STATUS_FETCH_RETRY);
- }
-
- if (redirecting && redirectCount > maxRedirect) {
- fetchQueues.finishFetchItem(fit);
- if (LOG.isInfoEnabled()) {
- LOG.info(" - redirect count exceeded " + fit.url);
- }
- output(fit.url, fit.datum, null,
- ProtocolStatus.STATUS_REDIR_EXCEEDED,
- CrawlDatum.STATUS_FETCH_GONE);
- }
-
- } while (redirecting && (redirectCount <= maxRedirect));
-
- } catch (Throwable t) { // unexpected exception
- // unblock
- fetchQueues.finishFetchItem(fit);
- logError(fit.url, StringUtils.stringifyException(t));
- output(fit.url, fit.datum, null, ProtocolStatus.STATUS_FAILED,
- CrawlDatum.STATUS_FETCH_RETRY);
- }
- }
-
- } catch (Throwable e) {
- if (LOG.isErrorEnabled()) {
- LOG.error("fetcher caught:" + e.toString());
- }
- } finally {
- if (fit != null)
- fetchQueues.finishFetchItem(fit);
- activeThreads.decrementAndGet(); // count threads
- LOG.info("-finishing thread " + getName() + ", activeThreads="
- + activeThreads);
- }
- }
-
- private Text handleRedirect(Text url, CrawlDatum datum, String urlString,
- String newUrl, boolean temp, String redirType)
- throws MalformedURLException, URLFilterException {
- newUrl = normalizers.normalize(newUrl, URLNormalizers.SCOPE_FETCHER);
- newUrl = urlFilters.filter(newUrl);
-
- if (ignoreExternalLinks) {
- try {
- String origHost = new URL(urlString).getHost().toLowerCase();
- String newHost = new URL(newUrl).getHost().toLowerCase();
- if (!origHost.equals(newHost)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(" - ignoring redirect " + redirType + " from "
- + urlString + " to " + newUrl
- + " because external links are ignored");
- }
- return null;
- }
- } catch (MalformedURLException e) {
- }
- }
-
- if (newUrl != null && !newUrl.equals(urlString)) {
- reprUrl = URLUtil.chooseRepr(reprUrl, newUrl, temp);
- url = new Text(newUrl);
- if (maxRedirect > 0) {
- redirecting = true;
- redirectCount++;
- if (LOG.isDebugEnabled()) {
- LOG.debug(" - " + redirType + " redirect to " + url
- + " (fetching now)");
- }
- return url;
- } else {
- CrawlDatum newDatum = new CrawlDatum(CrawlDatum.STATUS_LINKED,
- datum.getFetchInterval(), datum.getScore());
- // transfer existing metadata
- newDatum.getMetaData().putAll(datum.getMetaData());
- try {
- scfilters.initialScore(url, newDatum);
- } catch (ScoringFilterException e) {
- e.printStackTrace();
- }
- if (reprUrl != null) {
- newDatum.getMetaData().put(Nutch.WRITABLE_REPR_URL_KEY,
- new Text(reprUrl));
- }
- output(url, newDatum, null, null, CrawlDatum.STATUS_LINKED);
- if (LOG.isDebugEnabled()) {
- LOG.debug(" - " + redirType + " redirect to " + url
- + " (fetching later)");
- }
- return null;
- }
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug(" - " + redirType + " redirect skipped: "
- + (newUrl != null ? "to same url" : "filtered"));
- }
- return null;
- }
- }
-
- private FetchItem queueRedirect(Text redirUrl, FetchItem fit)
- throws ScoringFilterException {
- CrawlDatum newDatum = new CrawlDatum(CrawlDatum.STATUS_DB_UNFETCHED,
- fit.datum.getFetchInterval(), fit.datum.getScore());
- // transfer all existing metadata to the redirect
- newDatum.getMetaData().putAll(fit.datum.getMetaData());
- scfilters.initialScore(redirUrl, newDatum);
- if (reprUrl != null) {
- newDatum.getMetaData().put(Nutch.WRITABLE_REPR_URL_KEY,
- new Text(reprUrl));
- }
- fit = FetchItem.create(redirUrl, newDatum, queueMode);
- if (fit != null) {
- FetchItemQueue fiq = fetchQueues.getFetchItemQueue(fit.queueID);
- fiq.addInProgressFetchItem(fit);
- } else {
- // stop redirecting
- redirecting = false;
- reporter.incrCounter("FetcherStatus", "FetchItem.notCreated.redirect",
- 1);
- }
- return fit;
- }
-
- private void logError(Text url, String message) {
- if (LOG.isInfoEnabled()) {
- LOG.info("fetch of " + url + " failed with: " + message);
- }
- errors.incrementAndGet();
- }
-
- private ParseStatus output(Text key, CrawlDatum datum, Content content,
- ProtocolStatus pstatus, int status) {
-
- return output(key, datum, content, pstatus, status, 0);
- }
-
- private ParseStatus output(Text key, CrawlDatum datum, Content content,
- ProtocolStatus pstatus, int status, int outlinkDepth) {
-
- datum.setStatus(status);
- datum.setFetchTime(System.currentTimeMillis());
- if (pstatus != null)
- datum.getMetaData().put(Nutch.WRITABLE_PROTO_STATUS_KEY, pstatus);
-
- ParseResult parseResult = null;
- if (content != null) {
- Metadata metadata = content.getMetadata();
-
- // store the guessed content type in the crawldatum
- if (content.getContentType() != null)
- datum.getMetaData().put(new Text(Metadata.CONTENT_TYPE),
- new Text(content.getContentType()));
-
- // add segment to metadata
- metadata.set(Nutch.SEGMENT_NAME_KEY, segmentName);
- // add score to content metadata so that ParseSegment can pick it up.
- try {
- scfilters.passScoreBeforeParsing(key, datum, content);
- } catch (Exception e) {
- if (LOG.isWarnEnabled()) {
- LOG.warn("Couldn't pass score, url " + key + " (" + e + ")");
- }
- }
- /*
- * Note: Fetcher will only follow meta-redirects coming from the
- * original URL.
- */
- if (parsing && status == CrawlDatum.STATUS_FETCH_SUCCESS) {
- if (!skipTruncated
- || (skipTruncated && !ParseSegment.isTruncated(content))) {
- try {
- parseResult = this.parseUtil.parse(content);
- } catch (Exception e) {
- LOG.warn("Error parsing: " + key + ": "
- + StringUtils.stringifyException(e));
- }
- }
-
- if (parseResult == null) {
- byte[] signature = SignatureFactory.getSignature(getConf())
- .calculate(content, new ParseStatus().getEmptyParse(conf));
- datum.setSignature(signature);
- }
- }
-
- /*
- * Store status code in content So we can read this value during
parsing
- * (as a separate job) and decide to parse or not.
- */
- content.getMetadata().add(Nutch.FETCH_STATUS_KEY,
- Integer.toString(status));
- }
-
- try {
- output.collect(key, new NutchWritable(datum));
- if (content != null && storingContent)
- output.collect(key, new NutchWritable(content));
- if (parseResult != null) {
- for (Entry<Text, Parse> entry : parseResult) {
- Text url = entry.getKey();
- Parse parse = entry.getValue();
- ParseStatus parseStatus = parse.getData().getStatus();
- ParseData parseData = parse.getData();
-
- if (!parseStatus.isSuccess()) {
- LOG.warn("Error parsing: " + key + ": " + parseStatus);
- parse = parseStatus.getEmptyParse(getConf());
- }
-
- // Calculate page signature. For non-parsing fetchers this will
- // be done in ParseSegment
- byte[] signature = SignatureFactory.getSignature(getConf())
- .calculate(content, parse);
- // Ensure segment name and score are in parseData metadata
- parseData.getContentMeta().set(Nutch.SEGMENT_NAME_KEY,
segmentName);
- parseData.getContentMeta().set(Nutch.SIGNATURE_KEY,
- StringUtil.toHexString(signature));
- // Pass fetch time to content meta
- parseData.getContentMeta().set(Nutch.FETCH_TIME_KEY,
- Long.toString(datum.getFetchTime()));
- if (url.equals(key))
- datum.setSignature(signature);
- try {
- scfilters.passScoreAfterParsing(url, content, parse);
- } catch (Exception e) {
- if (LOG.isWarnEnabled()) {
- LOG.warn("Couldn't pass score, url " + key + " (" + e + ")");
- }
- }
-
- String fromHost;
-
- // collect outlinks for subsequent db update
- Outlink[] links = parseData.getOutlinks();
- int outlinksToStore = Math.min(maxOutlinks, links.length);
- if (ignoreExternalLinks) {
- try {
- fromHost = new URL(url.toString()).getHost().toLowerCase();
- } catch (MalformedURLException e) {
- fromHost = null;
- }
- } else {
- fromHost = null;
- }
-
- int validCount = 0;
-
- // Process all outlinks, normalize, filter and deduplicate
- List<Outlink> outlinkList = new
ArrayList<Outlink>(outlinksToStore);
- HashSet<String> outlinks = new HashSet<String>(outlinksToStore);
- for (int i = 0; i < links.length && validCount < outlinksToStore;
i++) {
- String toUrl = links[i].getToUrl();
-
- toUrl = ParseOutputFormat.filterNormalize(url.toString(), toUrl,
- fromHost, ignoreExternalLinks, urlFilters, normalizers);
- if (toUrl == null) {
- continue;
- }
-
- validCount++;
- links[i].setUrl(toUrl);
- outlinkList.add(links[i]);
- outlinks.add(toUrl);
- }
-
- // Only process depth N outlinks
- if (maxOutlinkDepth > 0 && outlinkDepth < maxOutlinkDepth) {
- reporter.incrCounter("FetcherOutlinks", "outlinks_detected",
- outlinks.size());
-
- // Counter to limit num outlinks to follow per page
- int outlinkCounter = 0;
-
- // Calculate variable number of outlinks by depth using the
- // divisor (outlinks = Math.floor(divisor / depth * num.links))
- int maxOutlinksByDepth = (int) Math.floor(outlinksDepthDivisor
- / (outlinkDepth + 1) * maxOutlinkDepthNumLinks);
-
- String followUrl;
-
- // Walk over the outlinks and add as new FetchItem to the queues
- Iterator<String> iter = outlinks.iterator();
- while (iter.hasNext() && outlinkCounter <
maxOutlinkDepthNumLinks) {
- followUrl = iter.next();
-
- // Check whether we'll follow external outlinks
- if (outlinksIgnoreExternal) {
- if (!URLUtil.getHost(url.toString()).equals(
- URLUtil.getHost(followUrl))) {
- continue;
- }
- }
-
- reporter
- .incrCounter("FetcherOutlinks", "outlinks_following", 1);
-
- // Create new FetchItem with depth incremented
- FetchItem fit = FetchItem.create(new Text(followUrl),
- new CrawlDatum(CrawlDatum.STATUS_LINKED, interval),
- queueMode, outlinkDepth + 1);
- fetchQueues.addFetchItem(fit);
-
- outlinkCounter++;
- }
- }
-
- // Overwrite the outlinks in ParseData with the normalized and
- // filtered set
- parseData.setOutlinks(outlinkList.toArray(new Outlink[outlinkList
- .size()]));
-
- output.collect(url, new NutchWritable(new ParseImpl(new ParseText(
- parse.getText()), parseData, parse.isCanonical())));
- }
- }
- } catch (IOException e) {
- if (LOG.isErrorEnabled()) {
- LOG.error("fetcher caught:" + e.toString());
- }
- }
-
- // return parse status if it exits
- if (parseResult != null && !parseResult.isEmpty()) {
- Parse p = parseResult.get(content.getUrl());
- if (p != null) {
- reporter.incrCounter("ParserStatus", ParseStatus.majorCodes[p
- .getData().getStatus().getMajorCode()], 1);
- return p.getData().getStatus();
- }
- }
- return null;
- }
-
- public synchronized void setHalted(boolean halted) {
- this.halted = halted;
- }
-
- public synchronized boolean isHalted() {
- return halted;
- }
-
- }
-
public Fetcher() {
super(null);
}
@@ -1206,11 +132,6 @@ public class Fetcher extends NutchTool i
super(conf);
}
- private void updateStatus(int bytesInPage) throws IOException {
- pages.incrementAndGet();
- bytes.addAndGet(bytesInPage);
- }
-
private void reportStatus(int pagesLastSec, int bytesLastSec)
throws IOException {
StringBuilder status = new StringBuilder();
@@ -1292,7 +213,9 @@ public class Fetcher extends NutchTool i
getConf().setBoolean(Protocol.CHECK_ROBOTS, false);
for (int i = 0; i < threadCount; i++) { // spawn threads
- FetcherThread t = new FetcherThread(getConf());
+ FetcherThread t = new FetcherThread(getConf(), getActiveThreads(),
fetchQueues,
+ feeder, spinWaiting, lastRequestStart, reporter, activeThreads,
segmentName,
+ parsing, output, storingContent, pages, bytes);
fetcherThreads.add(t);
t.start();
}
@@ -1437,7 +360,9 @@ public class Fetcher extends NutchTool i
+ additionalThreads + " new threads");
// activate new threads
for (int i = 0; i < additionalThreads; i++) {
- FetcherThread thread = new FetcherThread(getConf());
+ FetcherThread thread = new FetcherThread(getConf(),
getActiveThreads(), fetchQueues,
+ feeder, spinWaiting, lastRequestStart, reporter, errors,
segmentName, parsing,
+ output, storingContent, pages, bytes);
fetcherThreads.add(thread);
thread.start();
}
@@ -1479,7 +404,7 @@ public class Fetcher extends NutchTool i
FetcherThread thread = fetcherThreads.get(i);
if (thread.isAlive()) {
LOG.warn("Thread #" + i + " hung while processing "
- + thread.reprUrl);
+ + thread.getReprUrl());
if (LOG.isDebugEnabled()) {
StackTraceElement[] stack = thread.getStackTrace();
StringBuilder sb = new StringBuilder();
@@ -1626,6 +551,10 @@ public class Fetcher extends NutchTool i
}
}
+ private AtomicInteger getActiveThreads() {
+ return activeThreads;
+ }
+
@Override
public Map<String, Object> run(Map<String, String> args, String crawlId)
throws Exception {
Added: nutch/trunk/src/java/org/apache/nutch/fetcher/FetcherThread.java
URL:
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/fetcher/FetcherThread.java?rev=1678281&view=auto
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/fetcher/FetcherThread.java (added)
+++ nutch/trunk/src/java/org/apache/nutch/fetcher/FetcherThread.java Fri May 8
04:25:05 2015
@@ -0,0 +1,722 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.fetcher;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.NutchWritable;
+import org.apache.nutch.crawl.SignatureFactory;
+import org.apache.nutch.metadata.Metadata;
+import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.net.URLFilterException;
+import org.apache.nutch.net.URLFilters;
+import org.apache.nutch.net.URLNormalizers;
+import org.apache.nutch.parse.Outlink;
+import org.apache.nutch.parse.Parse;
+import org.apache.nutch.parse.ParseData;
+import org.apache.nutch.parse.ParseImpl;
+import org.apache.nutch.parse.ParseOutputFormat;
+import org.apache.nutch.parse.ParseResult;
+import org.apache.nutch.parse.ParseSegment;
+import org.apache.nutch.parse.ParseStatus;
+import org.apache.nutch.parse.ParseText;
+import org.apache.nutch.parse.ParseUtil;
+import org.apache.nutch.protocol.Content;
+import org.apache.nutch.protocol.Protocol;
+import org.apache.nutch.protocol.ProtocolFactory;
+import org.apache.nutch.protocol.ProtocolOutput;
+import org.apache.nutch.protocol.ProtocolStatus;
+import org.apache.nutch.scoring.ScoringFilterException;
+import org.apache.nutch.scoring.ScoringFilters;
+import org.apache.nutch.util.StringUtil;
+import org.apache.nutch.util.URLUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import crawlercommons.robots.BaseRobotRules;
+
+/**
+ * This class picks items from queues and fetches the pages.
+ */
+public class FetcherThread extends Thread {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FetcherThread.class);
+
+ private Configuration conf;
+ private URLFilters urlFilters;
+ private ScoringFilters scfilters;
+ private ParseUtil parseUtil;
+ private URLNormalizers normalizers;
+ private ProtocolFactory protocolFactory;
+ private long maxCrawlDelay;
+ private String queueMode;
+ private int maxRedirect;
+ private String reprUrl;
+ private boolean redirecting;
+ private int redirectCount;
+ private boolean ignoreExternalLinks;
+
+ // Used by fetcher.follow.outlinks.depth in parse
+ private int maxOutlinksPerPage;
+ private final int maxOutlinks;
+ private final int interval;
+ private int maxOutlinkDepth;
+ private int maxOutlinkDepthNumLinks;
+ private boolean outlinksIgnoreExternal;
+
+ private int outlinksDepthDivisor;
+ private boolean skipTruncated;
+
+ private boolean halted = false;
+
+ private AtomicInteger activeThreads;
+
+ private Object fetchQueues;
+
+ private QueueFeeder feeder;
+
+ private Object spinWaiting;
+
+ private AtomicLong lastRequestStart;
+
+ private Reporter reporter;
+
+ private AtomicInteger errors;
+
+ private String segmentName;
+
+ private boolean parsing;
+
+ private OutputCollector<Text, NutchWritable> output;
+
+ private boolean storingContent;
+
+ private AtomicInteger pages;
+
+ private AtomicLong bytes;
+
+ public FetcherThread(Configuration conf, AtomicInteger activeThreads,
FetchItemQueues fetchQueues,
+ QueueFeeder feeder, AtomicInteger spinWaiting, AtomicLong
lastRequestStart, Reporter reporter,
+ AtomicInteger errors, String segmentName, boolean parsing,
OutputCollector<Text, NutchWritable> output,
+ boolean storingContent, AtomicInteger pages, AtomicLong bytes) {
+ this.setDaemon(true); // don't hang JVM on exit
+ this.setName("FetcherThread"); // use an informative name
+ this.conf = conf;
+ this.urlFilters = new URLFilters(conf);
+ this.scfilters = new ScoringFilters(conf);
+ this.parseUtil = new ParseUtil(conf);
+ this.skipTruncated = conf.getBoolean(ParseSegment.SKIP_TRUNCATED, true);
+ this.protocolFactory = new ProtocolFactory(conf);
+ this.normalizers = new URLNormalizers(conf, URLNormalizers.SCOPE_FETCHER);
+ this.maxCrawlDelay = conf.getInt("fetcher.max.crawl.delay", 30) * 1000;
+ this.activeThreads = activeThreads;
+ this.fetchQueues = fetchQueues;
+ this.feeder = feeder;
+ this.spinWaiting = spinWaiting;
+ this.lastRequestStart = lastRequestStart;
+ this.reporter = reporter;
+ this.errors = errors;
+ this.segmentName = segmentName;
+ this.parsing = parsing;
+ this.output = output;
+ this.storingContent = storingContent;
+ this.pages = pages;
+ this.bytes = bytes;
+ queueMode = conf.get("fetcher.queue.mode",
+ FetchItemQueues.QUEUE_MODE_HOST);
+ // check that the mode is known
+ if (!queueMode.equals(FetchItemQueues.QUEUE_MODE_IP)
+ && !queueMode.equals(FetchItemQueues.QUEUE_MODE_DOMAIN)
+ && !queueMode.equals(FetchItemQueues.QUEUE_MODE_HOST)) {
+ LOG.error("Unknown partition mode : " + queueMode
+ + " - forcing to byHost");
+ queueMode = FetchItemQueues.QUEUE_MODE_HOST;
+ }
+ LOG.info("Using queue mode : " + queueMode);
+ this.maxRedirect = conf.getInt("http.redirect.max", 3);
+ this.ignoreExternalLinks = conf.getBoolean("db.ignore.external.links",
+ false);
+
+ maxOutlinksPerPage = conf.getInt("db.max.outlinks.per.page", 100);
+ maxOutlinks = (maxOutlinksPerPage < 0) ? Integer.MAX_VALUE
+ : maxOutlinksPerPage;
+ interval = conf.getInt("db.fetch.interval.default", 2592000);
+ ignoreExternalLinks = conf.getBoolean("db.ignore.external.links", false);
+ maxOutlinkDepth = conf.getInt("fetcher.follow.outlinks.depth", -1);
+ outlinksIgnoreExternal = conf.getBoolean(
+ "fetcher.follow.outlinks.ignore.external", false);
+ maxOutlinkDepthNumLinks = conf.getInt(
+ "fetcher.follow.outlinks.num.links", 4);
+ outlinksDepthDivisor = conf.getInt(
+ "fetcher.follow.outlinks.depth.divisor", 2);
+ }
+
+ @SuppressWarnings("fallthrough")
+ public void run() {
+ activeThreads.incrementAndGet(); // count threads
+
+ FetchItem fit = null;
+ try {
+
+ while (true) {
+ // check whether must be stopped
+ if (isHalted()) {
+ LOG.debug(getName() + " set to halted");
+ fit = null;
+ return;
+ }
+
+ fit = ((FetchItemQueues) fetchQueues).getFetchItem();
+ if (fit == null) {
+ if (feeder.isAlive() || ((FetchItemQueues)
fetchQueues).getTotalSize() > 0) {
+ LOG.debug(getName() + " spin-waiting ...");
+ // spin-wait.
+ ((AtomicInteger) spinWaiting).incrementAndGet();
+ try {
+ Thread.sleep(500);
+ } catch (Exception e) {
+ }
+ ((AtomicInteger) spinWaiting).decrementAndGet();
+ continue;
+ } else {
+ // all done, finish this thread
+ LOG.info("Thread " + getName() + " has no more work available");
+ return;
+ }
+ }
+ lastRequestStart.set(System.currentTimeMillis());
+ Text reprUrlWritable = (Text) fit.datum.getMetaData().get(
+ Nutch.WRITABLE_REPR_URL_KEY);
+ if (reprUrlWritable == null) {
+ setReprUrl(fit.url.toString());
+ } else {
+ setReprUrl(reprUrlWritable.toString());
+ }
+ try {
+ // fetch the page
+ redirecting = false;
+ redirectCount = 0;
+ do {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("fetching " + fit.url + " (queue crawl delay="
+ + ((FetchItemQueues)
fetchQueues).getFetchItemQueue(fit.queueID).crawlDelay
+ + "ms)");
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("redirectCount=" + redirectCount);
+ }
+ redirecting = false;
+ Protocol protocol = this.protocolFactory.getProtocol(fit.url
+ .toString());
+ BaseRobotRules rules = protocol.getRobotRules(fit.url, fit.datum);
+ if (!rules.isAllowed(fit.u.toString())) {
+ // unblock
+ ((FetchItemQueues) fetchQueues).finishFetchItem(fit, true);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Denied by robots.txt: " + fit.url);
+ }
+ output(fit.url, fit.datum, null,
+ ProtocolStatus.STATUS_ROBOTS_DENIED,
+ CrawlDatum.STATUS_FETCH_GONE);
+ reporter.incrCounter("FetcherStatus", "robots_denied", 1);
+ continue;
+ }
+ if (rules.getCrawlDelay() > 0) {
+ if (rules.getCrawlDelay() > maxCrawlDelay && maxCrawlDelay >= 0)
{
+ // unblock
+ ((FetchItemQueues) fetchQueues).finishFetchItem(fit, true);
+ LOG.debug("Crawl-Delay for " + fit.url + " too long ("
+ + rules.getCrawlDelay() + "), skipping");
+ output(fit.url, fit.datum, null,
+ ProtocolStatus.STATUS_ROBOTS_DENIED,
+ CrawlDatum.STATUS_FETCH_GONE);
+ reporter.incrCounter("FetcherStatus",
+ "robots_denied_maxcrawldelay", 1);
+ continue;
+ } else {
+ FetchItemQueue fiq = ((FetchItemQueues) fetchQueues)
+ .getFetchItemQueue(fit.queueID);
+ fiq.crawlDelay = rules.getCrawlDelay();
+ if (LOG.isDebugEnabled()) {
+ LOG.info("Crawl delay for queue: " + fit.queueID
+ + " is set to " + fiq.crawlDelay
+ + " as per robots.txt. url: " + fit.url);
+ }
+ }
+ }
+ ProtocolOutput output = protocol.getProtocolOutput(fit.url,
+ fit.datum);
+ ProtocolStatus status = output.getStatus();
+ Content content = output.getContent();
+ ParseStatus pstatus = null;
+ // unblock queue
+ ((FetchItemQueues) fetchQueues).finishFetchItem(fit);
+
+ String urlString = fit.url.toString();
+
+ reporter.incrCounter("FetcherStatus", status.getName(), 1);
+
+ switch (status.getCode()) {
+
+ case ProtocolStatus.WOULDBLOCK:
+ // retry ?
+ ((FetchItemQueues) fetchQueues).addFetchItem(fit);
+ break;
+
+ case ProtocolStatus.SUCCESS: // got a page
+ pstatus = output(fit.url, fit.datum, content, status,
+ CrawlDatum.STATUS_FETCH_SUCCESS, fit.outlinkDepth);
+ updateStatus(content.getContent().length);
+ if (pstatus != null && pstatus.isSuccess()
+ && pstatus.getMinorCode() == ParseStatus.SUCCESS_REDIRECT) {
+ String newUrl = pstatus.getMessage();
+ int refreshTime = Integer.valueOf(pstatus.getArgs()[1]);
+ Text redirUrl = handleRedirect(fit.url, fit.datum, urlString,
+ newUrl, refreshTime < Fetcher.PERM_REFRESH_TIME,
+ Fetcher.CONTENT_REDIR);
+ if (redirUrl != null) {
+ queueRedirect(redirUrl, fit);
+ }
+ }
+ break;
+
+ case ProtocolStatus.MOVED: // redirect
+ case ProtocolStatus.TEMP_MOVED:
+ int code;
+ boolean temp;
+ if (status.getCode() == ProtocolStatus.MOVED) {
+ code = CrawlDatum.STATUS_FETCH_REDIR_PERM;
+ temp = false;
+ } else {
+ code = CrawlDatum.STATUS_FETCH_REDIR_TEMP;
+ temp = true;
+ }
+ output(fit.url, fit.datum, content, status, code);
+ String newUrl = status.getMessage();
+ Text redirUrl = handleRedirect(fit.url, fit.datum, urlString,
+ newUrl, temp, Fetcher.PROTOCOL_REDIR);
+ if (redirUrl != null) {
+ queueRedirect(redirUrl, fit);
+ } else {
+ // stop redirecting
+ redirecting = false;
+ }
+ break;
+
+ case ProtocolStatus.EXCEPTION:
+ logError(fit.url, status.getMessage());
+ int killedURLs = ((FetchItemQueues)
fetchQueues).checkExceptionThreshold(fit
+ .getQueueID());
+ if (killedURLs != 0)
+ reporter.incrCounter("FetcherStatus",
+ "AboveExceptionThresholdInQueue", killedURLs);
+ /* FALLTHROUGH */
+ case ProtocolStatus.RETRY: // retry
+ case ProtocolStatus.BLOCKED:
+ output(fit.url, fit.datum, null, status,
+ CrawlDatum.STATUS_FETCH_RETRY);
+ break;
+
+ case ProtocolStatus.GONE: // gone
+ case ProtocolStatus.NOTFOUND:
+ case ProtocolStatus.ACCESS_DENIED:
+ case ProtocolStatus.ROBOTS_DENIED:
+ output(fit.url, fit.datum, null, status,
+ CrawlDatum.STATUS_FETCH_GONE);
+ break;
+
+ case ProtocolStatus.NOTMODIFIED:
+ output(fit.url, fit.datum, null, status,
+ CrawlDatum.STATUS_FETCH_NOTMODIFIED);
+ break;
+
+ default:
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("Unknown ProtocolStatus: " + status.getCode());
+ }
+ output(fit.url, fit.datum, null, status,
+ CrawlDatum.STATUS_FETCH_RETRY);
+ }
+
+ if (redirecting && redirectCount > maxRedirect) {
+ ((FetchItemQueues) fetchQueues).finishFetchItem(fit);
+ if (LOG.isInfoEnabled()) {
+ LOG.info(" - redirect count exceeded " + fit.url);
+ }
+ output(fit.url, fit.datum, null,
+ ProtocolStatus.STATUS_REDIR_EXCEEDED,
+ CrawlDatum.STATUS_FETCH_GONE);
+ }
+
+ } while (redirecting && (redirectCount <= maxRedirect));
+
+ } catch (Throwable t) { // unexpected exception
+ // unblock
+ ((FetchItemQueues) fetchQueues).finishFetchItem(fit);
+ logError(fit.url, StringUtils.stringifyException(t));
+ output(fit.url, fit.datum, null, ProtocolStatus.STATUS_FAILED,
+ CrawlDatum.STATUS_FETCH_RETRY);
+ }
+ }
+
+ } catch (Throwable e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("fetcher caught:" + e.toString());
+ }
+ } finally {
+ if (fit != null)
+ ((FetchItemQueues) fetchQueues).finishFetchItem(fit);
+ activeThreads.decrementAndGet(); // count threads
+ LOG.info("-finishing thread " + getName() + ", activeThreads="
+ + activeThreads);
+ }
+ }
+
+ private Text handleRedirect(Text url, CrawlDatum datum, String urlString,
+ String newUrl, boolean temp, String redirType)
+ throws MalformedURLException, URLFilterException {
+ newUrl = normalizers.normalize(newUrl, URLNormalizers.SCOPE_FETCHER);
+ newUrl = urlFilters.filter(newUrl);
+
+ if (ignoreExternalLinks) {
+ try {
+ String origHost = new URL(urlString).getHost().toLowerCase();
+ String newHost = new URL(newUrl).getHost().toLowerCase();
+ if (!origHost.equals(newHost)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(" - ignoring redirect " + redirType + " from "
+ + urlString + " to " + newUrl
+ + " because external links are ignored");
+ }
+ return null;
+ }
+ } catch (MalformedURLException e) {
+ }
+ }
+
+ if (newUrl != null && !newUrl.equals(urlString)) {
+ reprUrl = URLUtil.chooseRepr(reprUrl, newUrl, temp);
+ url = new Text(newUrl);
+ if (maxRedirect > 0) {
+ redirecting = true;
+ redirectCount++;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(" - " + redirType + " redirect to " + url
+ + " (fetching now)");
+ }
+ return url;
+ } else {
+ CrawlDatum newDatum = new CrawlDatum(CrawlDatum.STATUS_LINKED,
+ datum.getFetchInterval(), datum.getScore());
+ // transfer existing metadata
+ newDatum.getMetaData().putAll(datum.getMetaData());
+ try {
+ scfilters.initialScore(url, newDatum);
+ } catch (ScoringFilterException e) {
+ e.printStackTrace();
+ }
+ if (reprUrl != null) {
+ newDatum.getMetaData().put(Nutch.WRITABLE_REPR_URL_KEY,
+ new Text(reprUrl));
+ }
+ output(url, newDatum, null, null, CrawlDatum.STATUS_LINKED);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(" - " + redirType + " redirect to " + url
+ + " (fetching later)");
+ }
+ return null;
+ }
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(" - " + redirType + " redirect skipped: "
+ + (newUrl != null ? "to same url" : "filtered"));
+ }
+ return null;
+ }
+ }
+
+ private void queueRedirect(Text redirUrl, FetchItem fit)
+ throws ScoringFilterException {
+ CrawlDatum newDatum = new CrawlDatum(CrawlDatum.STATUS_DB_UNFETCHED,
+ fit.datum.getFetchInterval(), fit.datum.getScore());
+ // transfer all existing metadata to the redirect
+ newDatum.getMetaData().putAll(fit.datum.getMetaData());
+ scfilters.initialScore(redirUrl, newDatum);
+ if (reprUrl != null) {
+ newDatum.getMetaData().put(Nutch.WRITABLE_REPR_URL_KEY,
+ new Text(reprUrl));
+ }
+ fit = FetchItem.create(redirUrl, newDatum, queueMode);
+ if (fit != null) {
+ FetchItemQueue fiq = ((FetchItemQueues)
fetchQueues).getFetchItemQueue(fit.queueID);
+ fiq.addInProgressFetchItem(fit);
+ } else {
+ // stop redirecting
+ redirecting = false;
+ reporter.incrCounter("FetcherStatus", "FetchItem.notCreated.redirect",
+ 1);
+ }
+ }
+
+ private void logError(Text url, String message) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("fetch of " + url + " failed with: " + message);
+ }
+ errors.incrementAndGet();
+ }
+
+ private ParseStatus output(Text key, CrawlDatum datum, Content content,
+ ProtocolStatus pstatus, int status) {
+
+ return output(key, datum, content, pstatus, status, 0);
+ }
+
+ private ParseStatus output(Text key, CrawlDatum datum, Content content,
+ ProtocolStatus pstatus, int status, int outlinkDepth) {
+
+ datum.setStatus(status);
+ datum.setFetchTime(System.currentTimeMillis());
+ if (pstatus != null)
+ datum.getMetaData().put(Nutch.WRITABLE_PROTO_STATUS_KEY, pstatus);
+
+ ParseResult parseResult = null;
+ if (content != null) {
+ Metadata metadata = content.getMetadata();
+
+ // store the guessed content type in the crawldatum
+ if (content.getContentType() != null)
+ datum.getMetaData().put(new Text(Metadata.CONTENT_TYPE),
+ new Text(content.getContentType()));
+
+ // add segment to metadata
+ metadata.set(Nutch.SEGMENT_NAME_KEY, segmentName);
+ // add score to content metadata so that ParseSegment can pick it up.
+ try {
+ scfilters.passScoreBeforeParsing(key, datum, content);
+ } catch (Exception e) {
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("Couldn't pass score, url " + key + " (" + e + ")");
+ }
+ }
+ /*
+ * Note: Fetcher will only follow meta-redirects coming from the
+ * original URL.
+ */
+ if (parsing && status == CrawlDatum.STATUS_FETCH_SUCCESS) {
+ if (!skipTruncated
+ || (skipTruncated && !ParseSegment.isTruncated(content))) {
+ try {
+ parseResult = this.parseUtil.parse(content);
+ } catch (Exception e) {
+ LOG.warn("Error parsing: " + key + ": "
+ + StringUtils.stringifyException(e));
+ }
+ }
+
+ if (parseResult == null) {
+ byte[] signature = SignatureFactory.getSignature(conf)
+ .calculate(content, new ParseStatus().getEmptyParse(conf));
+ datum.setSignature(signature);
+ }
+ }
+
+ /*
+ * Store status code in content So we can read this value during parsing
+ * (as a separate job) and decide to parse or not.
+ */
+ content.getMetadata().add(Nutch.FETCH_STATUS_KEY,
+ Integer.toString(status));
+ }
+
+ try {
+ output.collect(key, new NutchWritable(datum));
+ if (content != null && storingContent)
+ output.collect(key, new NutchWritable(content));
+ if (parseResult != null) {
+ for (Entry<Text, Parse> entry : parseResult) {
+ Text url = entry.getKey();
+ Parse parse = entry.getValue();
+ ParseStatus parseStatus = parse.getData().getStatus();
+ ParseData parseData = parse.getData();
+
+ if (!parseStatus.isSuccess()) {
+ LOG.warn("Error parsing: " + key + ": " + parseStatus);
+ parse = parseStatus.getEmptyParse(conf);
+ }
+
+ // Calculate page signature. For non-parsing fetchers this will
+ // be done in ParseSegment
+ byte[] signature = SignatureFactory.getSignature(conf)
+ .calculate(content, parse);
+ // Ensure segment name and score are in parseData metadata
+ parseData.getContentMeta().set(Nutch.SEGMENT_NAME_KEY, segmentName);
+ parseData.getContentMeta().set(Nutch.SIGNATURE_KEY,
+ StringUtil.toHexString(signature));
+ // Pass fetch time to content meta
+ parseData.getContentMeta().set(Nutch.FETCH_TIME_KEY,
+ Long.toString(datum.getFetchTime()));
+ if (url.equals(key))
+ datum.setSignature(signature);
+ try {
+ scfilters.passScoreAfterParsing(url, content, parse);
+ } catch (Exception e) {
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("Couldn't pass score, url " + key + " (" + e + ")");
+ }
+ }
+
+ String fromHost;
+
+ // collect outlinks for subsequent db update
+ Outlink[] links = parseData.getOutlinks();
+ int outlinksToStore = Math.min(maxOutlinks, links.length);
+ if (ignoreExternalLinks) {
+ try {
+ fromHost = new URL(url.toString()).getHost().toLowerCase();
+ } catch (MalformedURLException e) {
+ fromHost = null;
+ }
+ } else {
+ fromHost = null;
+ }
+
+ int validCount = 0;
+
+ // Process all outlinks, normalize, filter and deduplicate
+ List<Outlink> outlinkList = new ArrayList<Outlink>(outlinksToStore);
+ HashSet<String> outlinks = new HashSet<String>(outlinksToStore);
+ for (int i = 0; i < links.length && validCount < outlinksToStore;
i++) {
+ String toUrl = links[i].getToUrl();
+
+ toUrl = ParseOutputFormat.filterNormalize(url.toString(), toUrl,
+ fromHost, ignoreExternalLinks, urlFilters, normalizers);
+ if (toUrl == null) {
+ continue;
+ }
+
+ validCount++;
+ links[i].setUrl(toUrl);
+ outlinkList.add(links[i]);
+ outlinks.add(toUrl);
+ }
+
+ // Only process depth N outlinks
+ if (maxOutlinkDepth > 0 && outlinkDepth < maxOutlinkDepth) {
+ reporter.incrCounter("FetcherOutlinks", "outlinks_detected",
+ outlinks.size());
+
+ // Counter to limit num outlinks to follow per page
+ int outlinkCounter = 0;
+
+ // Calculate variable number of outlinks by depth using the
+ // divisor (outlinks = Math.floor(divisor / depth * num.links))
+ int maxOutlinksByDepth = (int) Math.floor(outlinksDepthDivisor
+ / (outlinkDepth + 1) * maxOutlinkDepthNumLinks);
+
+ String followUrl;
+
+ // Walk over the outlinks and add as new FetchItem to the queues
+ Iterator<String> iter = outlinks.iterator();
+ while (iter.hasNext() && outlinkCounter < maxOutlinkDepthNumLinks)
{
+ followUrl = iter.next();
+
+ // Check whether we'll follow external outlinks
+ if (outlinksIgnoreExternal) {
+ if (!URLUtil.getHost(url.toString()).equals(
+ URLUtil.getHost(followUrl))) {
+ continue;
+ }
+ }
+
+ reporter
+ .incrCounter("FetcherOutlinks", "outlinks_following", 1);
+
+ // Create new FetchItem with depth incremented
+ FetchItem fit = FetchItem.create(new Text(followUrl),
+ new CrawlDatum(CrawlDatum.STATUS_LINKED, interval),
+ queueMode, outlinkDepth + 1);
+ ((FetchItemQueues) fetchQueues).addFetchItem(fit);
+
+ outlinkCounter++;
+ }
+ }
+
+ // Overwrite the outlinks in ParseData with the normalized and
+ // filtered set
+ parseData.setOutlinks(outlinkList.toArray(new Outlink[outlinkList
+ .size()]));
+
+ output.collect(url, new NutchWritable(new ParseImpl(new ParseText(
+ parse.getText()), parseData, parse.isCanonical())));
+ }
+ }
+ } catch (IOException e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("fetcher caught:" + e.toString());
+ }
+ }
+
+ // return parse status if it exits
+ if (parseResult != null && !parseResult.isEmpty()) {
+ Parse p = parseResult.get(content.getUrl());
+ if (p != null) {
+ reporter.incrCounter("ParserStatus", ParseStatus.majorCodes[p
+ .getData().getStatus().getMajorCode()], 1);
+ return p.getData().getStatus();
+ }
+ }
+ return null;
+ }
+
+ private void updateStatus(int bytesInPage) throws IOException {
+ pages.incrementAndGet();
+ bytes.addAndGet(bytesInPage);
+ }
+
+ public synchronized void setHalted(boolean halted) {
+ this.halted = halted;
+ }
+
+ public synchronized boolean isHalted() {
+ return halted;
+ }
+
+ public String getReprUrl() {
+ return reprUrl;
+ }
+
+ private void setReprUrl(String urlString) {
+ this.reprUrl = urlString;
+
+ }
+
+}