This is an automated email from the ASF dual-hosted git repository.
markus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nutch.git
The following commit(s) were added to refs/heads/master by this push:
new 5ba50c0c6 NUTCH-3029 Host specific max. and min. intervals in adaptive
scheduler
5ba50c0c6 is described below
commit 5ba50c0c6091a95818d3788f0d5b7c0ff49bec57
Author: Markus Jelsma <[email protected]>
AuthorDate: Wed Mar 13 14:53:10 2024 +0000
NUTCH-3029 Host specific max. and min. intervals in adaptive scheduler
---
.../apache/nutch/crawl/AdaptiveFetchSchedule.java | 159 ++++++++++++++++++++-
1 file changed, 155 insertions(+), 4 deletions(-)
diff --git a/src/java/org/apache/nutch/crawl/AdaptiveFetchSchedule.java
b/src/java/org/apache/nutch/crawl/AdaptiveFetchSchedule.java
index 5bccd4f30..a403d5649 100644
--- a/src/java/org/apache/nutch/crawl/AdaptiveFetchSchedule.java
+++ b/src/java/org/apache/nutch/crawl/AdaptiveFetchSchedule.java
@@ -22,11 +22,20 @@ import org.apache.hadoop.io.FloatWritable;
import org.apache.nutch.crawl.CrawlDatum;
import org.apache.nutch.metadata.Nutch;
import org.apache.nutch.util.NutchConfiguration;
+import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.Reader;
+import java.io.FileReader;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.util.Map;
+import java.util.HashMap;
import java.lang.invoke.MethodHandles;
+import java.net.URI;
+import java.net.URISyntaxException;
/**
* This class implements an adaptive re-fetch algorithm. This works as follows:
@@ -79,9 +88,16 @@ public class AdaptiveFetchSchedule extends
AbstractFetchSchedule {
private double SYNC_DELTA_RATE;
+ private Configuration conf;
+
+ private Map<String,Float> hostSpecificMaxInterval = new HashMap<>();
+
+ private Map<String,Float> hostSpecificMinInterval = new HashMap<>();
+
@Override
public void setConf(Configuration conf) {
super.setConf(conf);
+ this.conf = conf;
if (conf == null)
return;
INC_RATE = conf.getFloat("db.fetch.schedule.adaptive.inc_rate", 0.2f);
@@ -92,6 +108,136 @@ public class AdaptiveFetchSchedule extends
AbstractFetchSchedule {
SYNC_DELTA = conf.getBoolean("db.fetch.schedule.adaptive.sync_delta",
true);
SYNC_DELTA_RATE = conf.getFloat(
"db.fetch.schedule.adaptive.sync_delta_rate", 0.2f);
+ try {
+ setHostSpecificIntervals("adaptive-host-specific-intervals.txt",
+ MIN_INTERVAL, MAX_INTERVAL);
+ } catch (IOException e){
+ LOG.error("Failed reading the configuration file. ", e);
+ }
+ }
+
+ /**
+ * Load host-specific min_intervals and max_intervals
+ * from the configuration file into the HashMaps.
+ */
+ private void setHostSpecificIntervals(String fileName,
+ float defaultMin, float defaultMax) throws IOException {
+ Reader configReader = null;
+ configReader = conf.getConfResourceAsReader(fileName);
+ if (configReader == null) {
+ configReader = new FileReader(fileName);
+ }
+ BufferedReader reader = new BufferedReader(configReader);
+ String line;
+ int lineNo = 0;
+ while ((line = reader.readLine()) != null) {
+ lineNo++;
+ if (StringUtils.isNotBlank(line) && !line.startsWith("#")) {
+ line = line.trim();
+ String[] parts = line.split("\\s+");
+ if (parts.length == 3) {
+ // TODO: Maybe add host validatio here?
+ // It might get computationally expensive for large files, though.
+ String host = parts[0].trim().toLowerCase();
+ String minInt = parts[1].trim();
+ String maxInt = parts[2].trim();
+ if (minInt.equalsIgnoreCase("default")){ minInt = "0"; }
+ if (maxInt.equalsIgnoreCase("default")){ maxInt = "0"; }
+ float m,M;
+ try {
+ m = Float.parseFloat(minInt);
+ M = Float.parseFloat(maxInt);
+
+ //negative values and mismatched boundaries are ignored
+ //(default to global settings)
+ if (m < 0 || M < 0 || m > M){
+ LOG.error("Improper fetch intervals given on line " +
String.valueOf(lineNo)
+ + " in the config. file: " + line);
+ } else {
+
+ // min. interval should be positive and above the global minimum
+ if (m > 0 && m > defaultMin){
+ hostSpecificMinInterval.put(host,m);
+ LOG.debug("Added custom min. interval " + m + " for host " +
host + ".");
+ } else if (m > 0) {
+ LOG.error("Min. interval out of bounds on line " +
String.valueOf(lineNo)
+ + " in the config. file: " + line);
+ }
+
+ // max. interval should be positive and below the global maximum
+ if (M > 0 && M < defaultMax){
+ hostSpecificMaxInterval.put(host,M);
+ LOG.debug("Added custom max. interval " + M + " for host " +
host + ".");
+ } else if (M > 0){
+ LOG.error("Max. interval out of bounds on line " +
String.valueOf(lineNo)
+ + " in the config. file: " + line);
+ }
+
+ // zero values are ignored (default to global settings)
+ }
+ } catch (NumberFormatException e){
+ LOG.error("No proper fetch intervals given on line " +
String.valueOf(lineNo)
+ + " in the config. file: " + line, e);
+ }
+ } else {
+ LOG.error("Malformed (domain, min_interval, max_interval) triplet on
line "
+ + String.valueOf(lineNo) + " of the config. file: " + line);
+ }
+ }
+ }
+ }
+
+ /**
+ * Strip a URL, leaving only the host name.
+ */
+ public static String getHostName(String url) throws URISyntaxException {
+ URI uri = new URI(url);
+ String domain = uri.getHost();
+ return domain;
+ }
+
+ /**
+ * Returns the max_interval for this URL, which might depend on the host.
+ * @param url the URL to be scheduled
+ * @param defaultMaxInterval the value to which to default
+ * if max_interval has not been configured for this host
+ */
+ public float getMaxInterval(Text url, float defaultMaxInterval){
+ if (hostSpecificMaxInterval.isEmpty()) {
+ return defaultMaxInterval;
+ }
+ String host;
+ try {
+ host = getHostName(url.toString());
+ } catch (URISyntaxException e){
+ return defaultMaxInterval;
+ }
+ if (hostSpecificMaxInterval.containsKey(host)){
+ return hostSpecificMaxInterval.get(host);
+ }
+ return defaultMaxInterval;
+ }
+
+ /**
+ * Returns the min_interval for this URL, which might depend on the host.
+ * @param url the URL to be scheduled
+ * @param defaultMinInterval the value to which to default
+ * if min_interval has not been configured for this host
+ */
+ public float getMinInterval(Text url, float defaultMinInterval){
+ if (hostSpecificMinInterval.isEmpty()) {
+ return defaultMinInterval;
+ }
+ String host;
+ try {
+ host = getHostName(url.toString());
+ } catch (URISyntaxException e){
+ return defaultMinInterval;
+ }
+ if (hostSpecificMinInterval.containsKey(host)){
+ return hostSpecificMinInterval.get(host);
+ }
+ return defaultMinInterval;
}
@Override
@@ -133,10 +279,15 @@ public class AdaptiveFetchSchedule extends
AbstractFetchSchedule {
interval = delta;
refTime = fetchTime - Math.round(delta * SYNC_DELTA_RATE * 1000);
}
- if (interval < MIN_INTERVAL) {
- interval = MIN_INTERVAL;
- } else if (interval > MAX_INTERVAL) {
- interval = MAX_INTERVAL;
+
+ // replace min_interval and max_interval with a domain-specific ones,
+ // if so configured.
+ float newMaxInterval = getMaxInterval(url, MAX_INTERVAL);
+ float newMinInterval = getMinInterval(url, MIN_INTERVAL);
+ if (interval < newMinInterval) {
+ interval = newMinInterval;
+ } else if (interval > newMaxInterval) {
+ interval = newMaxInterval;
}
}