This is an automated email from the ASF dual-hosted git repository. markus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nutch.git
The following commit(s) were added to refs/heads/master by this push: new 5ba50c0c6 NUTCH-3029 Host specific max. and min. intervals in adaptive scheduler 5ba50c0c6 is described below commit 5ba50c0c6091a95818d3788f0d5b7c0ff49bec57 Author: Markus Jelsma <mar...@apache.org> AuthorDate: Wed Mar 13 14:53:10 2024 +0000 NUTCH-3029 Host specific max. and min. intervals in adaptive scheduler --- .../apache/nutch/crawl/AdaptiveFetchSchedule.java | 159 ++++++++++++++++++++- 1 file changed, 155 insertions(+), 4 deletions(-) diff --git a/src/java/org/apache/nutch/crawl/AdaptiveFetchSchedule.java b/src/java/org/apache/nutch/crawl/AdaptiveFetchSchedule.java index 5bccd4f30..a403d5649 100644 --- a/src/java/org/apache/nutch/crawl/AdaptiveFetchSchedule.java +++ b/src/java/org/apache/nutch/crawl/AdaptiveFetchSchedule.java @@ -22,11 +22,20 @@ import org.apache.hadoop.io.FloatWritable; import org.apache.nutch.crawl.CrawlDatum; import org.apache.nutch.metadata.Nutch; import org.apache.nutch.util.NutchConfiguration; +import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Reader; +import java.io.FileReader; +import java.io.BufferedReader; +import java.io.IOException; +import java.util.Map; +import java.util.HashMap; import java.lang.invoke.MethodHandles; +import java.net.URI; +import java.net.URISyntaxException; /** * This class implements an adaptive re-fetch algorithm. This works as follows: @@ -79,9 +88,16 @@ public class AdaptiveFetchSchedule extends AbstractFetchSchedule { private double SYNC_DELTA_RATE; + private Configuration conf; + + private Map<String,Float> hostSpecificMaxInterval = new HashMap<>(); + + private Map<String,Float> hostSpecificMinInterval = new HashMap<>(); + @Override public void setConf(Configuration conf) { super.setConf(conf); + this.conf = conf; if (conf == null) return; INC_RATE = conf.getFloat("db.fetch.schedule.adaptive.inc_rate", 0.2f); @@ -92,6 +108,136 @@ public class AdaptiveFetchSchedule extends AbstractFetchSchedule { SYNC_DELTA = conf.getBoolean("db.fetch.schedule.adaptive.sync_delta", true); SYNC_DELTA_RATE = conf.getFloat( "db.fetch.schedule.adaptive.sync_delta_rate", 0.2f); + try { + setHostSpecificIntervals("adaptive-host-specific-intervals.txt", + MIN_INTERVAL, MAX_INTERVAL); + } catch (IOException e){ + LOG.error("Failed reading the configuration file. ", e); + } + } + + /** + * Load host-specific min_intervals and max_intervals + * from the configuration file into the HashMaps. + */ + private void setHostSpecificIntervals(String fileName, + float defaultMin, float defaultMax) throws IOException { + Reader configReader = null; + configReader = conf.getConfResourceAsReader(fileName); + if (configReader == null) { + configReader = new FileReader(fileName); + } + BufferedReader reader = new BufferedReader(configReader); + String line; + int lineNo = 0; + while ((line = reader.readLine()) != null) { + lineNo++; + if (StringUtils.isNotBlank(line) && !line.startsWith("#")) { + line = line.trim(); + String[] parts = line.split("\\s+"); + if (parts.length == 3) { + // TODO: Maybe add host validatio here? + // It might get computationally expensive for large files, though. + String host = parts[0].trim().toLowerCase(); + String minInt = parts[1].trim(); + String maxInt = parts[2].trim(); + if (minInt.equalsIgnoreCase("default")){ minInt = "0"; } + if (maxInt.equalsIgnoreCase("default")){ maxInt = "0"; } + float m,M; + try { + m = Float.parseFloat(minInt); + M = Float.parseFloat(maxInt); + + //negative values and mismatched boundaries are ignored + //(default to global settings) + if (m < 0 || M < 0 || m > M){ + LOG.error("Improper fetch intervals given on line " + String.valueOf(lineNo) + + " in the config. file: " + line); + } else { + + // min. interval should be positive and above the global minimum + if (m > 0 && m > defaultMin){ + hostSpecificMinInterval.put(host,m); + LOG.debug("Added custom min. interval " + m + " for host " + host + "."); + } else if (m > 0) { + LOG.error("Min. interval out of bounds on line " + String.valueOf(lineNo) + + " in the config. file: " + line); + } + + // max. interval should be positive and below the global maximum + if (M > 0 && M < defaultMax){ + hostSpecificMaxInterval.put(host,M); + LOG.debug("Added custom max. interval " + M + " for host " + host + "."); + } else if (M > 0){ + LOG.error("Max. interval out of bounds on line " + String.valueOf(lineNo) + + " in the config. file: " + line); + } + + // zero values are ignored (default to global settings) + } + } catch (NumberFormatException e){ + LOG.error("No proper fetch intervals given on line " + String.valueOf(lineNo) + + " in the config. file: " + line, e); + } + } else { + LOG.error("Malformed (domain, min_interval, max_interval) triplet on line " + + String.valueOf(lineNo) + " of the config. file: " + line); + } + } + } + } + + /** + * Strip a URL, leaving only the host name. + */ + public static String getHostName(String url) throws URISyntaxException { + URI uri = new URI(url); + String domain = uri.getHost(); + return domain; + } + + /** + * Returns the max_interval for this URL, which might depend on the host. + * @param url the URL to be scheduled + * @param defaultMaxInterval the value to which to default + * if max_interval has not been configured for this host + */ + public float getMaxInterval(Text url, float defaultMaxInterval){ + if (hostSpecificMaxInterval.isEmpty()) { + return defaultMaxInterval; + } + String host; + try { + host = getHostName(url.toString()); + } catch (URISyntaxException e){ + return defaultMaxInterval; + } + if (hostSpecificMaxInterval.containsKey(host)){ + return hostSpecificMaxInterval.get(host); + } + return defaultMaxInterval; + } + + /** + * Returns the min_interval for this URL, which might depend on the host. + * @param url the URL to be scheduled + * @param defaultMinInterval the value to which to default + * if min_interval has not been configured for this host + */ + public float getMinInterval(Text url, float defaultMinInterval){ + if (hostSpecificMinInterval.isEmpty()) { + return defaultMinInterval; + } + String host; + try { + host = getHostName(url.toString()); + } catch (URISyntaxException e){ + return defaultMinInterval; + } + if (hostSpecificMinInterval.containsKey(host)){ + return hostSpecificMinInterval.get(host); + } + return defaultMinInterval; } @Override @@ -133,10 +279,15 @@ public class AdaptiveFetchSchedule extends AbstractFetchSchedule { interval = delta; refTime = fetchTime - Math.round(delta * SYNC_DELTA_RATE * 1000); } - if (interval < MIN_INTERVAL) { - interval = MIN_INTERVAL; - } else if (interval > MAX_INTERVAL) { - interval = MAX_INTERVAL; + + // replace min_interval and max_interval with a domain-specific ones, + // if so configured. + float newMaxInterval = getMaxInterval(url, MAX_INTERVAL); + float newMinInterval = getMinInterval(url, MIN_INTERVAL); + if (interval < newMinInterval) { + interval = newMinInterval; + } else if (interval > newMaxInterval) { + interval = newMaxInterval; } }