This is an automated email from the ASF dual-hosted git repository.

markus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nutch.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ba50c0c6 NUTCH-3029 Host specific max. and min. intervals in adaptive 
scheduler
5ba50c0c6 is described below

commit 5ba50c0c6091a95818d3788f0d5b7c0ff49bec57
Author: Markus Jelsma <mar...@apache.org>
AuthorDate: Wed Mar 13 14:53:10 2024 +0000

    NUTCH-3029 Host specific max. and min. intervals in adaptive scheduler
---
 .../apache/nutch/crawl/AdaptiveFetchSchedule.java  | 159 ++++++++++++++++++++-
 1 file changed, 155 insertions(+), 4 deletions(-)

diff --git a/src/java/org/apache/nutch/crawl/AdaptiveFetchSchedule.java 
b/src/java/org/apache/nutch/crawl/AdaptiveFetchSchedule.java
index 5bccd4f30..a403d5649 100644
--- a/src/java/org/apache/nutch/crawl/AdaptiveFetchSchedule.java
+++ b/src/java/org/apache/nutch/crawl/AdaptiveFetchSchedule.java
@@ -22,11 +22,20 @@ import org.apache.hadoop.io.FloatWritable;
 import org.apache.nutch.crawl.CrawlDatum;
 import org.apache.nutch.metadata.Nutch;
 import org.apache.nutch.util.NutchConfiguration;
+import org.apache.commons.lang.StringUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Reader;
+import java.io.FileReader;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.util.Map;
+import java.util.HashMap;
 import java.lang.invoke.MethodHandles;
+import java.net.URI;
+import java.net.URISyntaxException;
 
 /**
  * This class implements an adaptive re-fetch algorithm. This works as follows:
@@ -79,9 +88,16 @@ public class AdaptiveFetchSchedule extends 
AbstractFetchSchedule {
 
   private double SYNC_DELTA_RATE;
 
+  private Configuration conf;
+
+  private Map<String,Float> hostSpecificMaxInterval = new HashMap<>();
+  
+  private Map<String,Float> hostSpecificMinInterval = new HashMap<>();
+
   @Override
   public void setConf(Configuration conf) {
     super.setConf(conf);
+    this.conf = conf;
     if (conf == null)
       return;
     INC_RATE = conf.getFloat("db.fetch.schedule.adaptive.inc_rate", 0.2f);
@@ -92,6 +108,136 @@ public class AdaptiveFetchSchedule extends 
AbstractFetchSchedule {
     SYNC_DELTA = conf.getBoolean("db.fetch.schedule.adaptive.sync_delta", 
true);
     SYNC_DELTA_RATE = conf.getFloat(
         "db.fetch.schedule.adaptive.sync_delta_rate", 0.2f);
+    try {
+      setHostSpecificIntervals("adaptive-host-specific-intervals.txt", 
+        MIN_INTERVAL, MAX_INTERVAL);
+    } catch (IOException e){
+      LOG.error("Failed reading the configuration file. ", e);
+    }
+  }
+
+  /**
+   * Load host-specific min_intervals and max_intervals
+   * from the configuration file into the HashMaps.
+   */
+  private void setHostSpecificIntervals(String fileName,
+    float defaultMin, float defaultMax) throws IOException {
+    Reader configReader = null;
+    configReader = conf.getConfResourceAsReader(fileName);
+    if (configReader == null) {
+      configReader = new FileReader(fileName);
+    }
+    BufferedReader reader = new BufferedReader(configReader);
+    String line;
+    int lineNo = 0;
+    while ((line = reader.readLine()) != null) {
+      lineNo++;
+      if (StringUtils.isNotBlank(line) && !line.startsWith("#")) {
+        line = line.trim();
+        String[] parts = line.split("\\s+");
+        if (parts.length == 3) {
+          // TODO: Maybe add host validatio here?
+          // It might get computationally expensive for large files, though.
+          String host = parts[0].trim().toLowerCase();
+          String minInt = parts[1].trim();
+          String maxInt = parts[2].trim();
+          if (minInt.equalsIgnoreCase("default")){ minInt = "0"; }
+          if (maxInt.equalsIgnoreCase("default")){ maxInt = "0"; }
+          float m,M;
+          try {
+            m = Float.parseFloat(minInt);
+            M = Float.parseFloat(maxInt);
+
+            //negative values and mismatched boundaries are ignored
+            //(default to global settings)
+            if (m < 0 || M < 0 || m > M){
+              LOG.error("Improper fetch intervals given on line " + 
String.valueOf(lineNo)
+                + " in the config. file: " + line);
+            } else {
+
+              // min. interval should be positive and above the global minimum
+              if (m > 0 && m > defaultMin){
+                  hostSpecificMinInterval.put(host,m);
+                  LOG.debug("Added custom min. interval " + m + " for host " + 
host + ".");
+              } else if (m > 0) {
+                LOG.error("Min. interval out of bounds on line " + 
String.valueOf(lineNo)
+                  + " in the config. file: " + line);
+              }
+
+              // max. interval should be positive and below the global maximum
+              if (M > 0 && M < defaultMax){
+                hostSpecificMaxInterval.put(host,M);
+                LOG.debug("Added custom max. interval " + M + " for host " + 
host + ".");
+              } else if (M > 0){
+                LOG.error("Max. interval out of bounds on line " + 
String.valueOf(lineNo)
+                  + " in the config. file: " + line);
+              }
+
+              // zero values are ignored (default to global settings)
+            }
+          } catch (NumberFormatException e){
+            LOG.error("No proper fetch intervals given on line " + 
String.valueOf(lineNo)
+              + " in the config. file: " + line, e);
+          }
+        } else {
+          LOG.error("Malformed (domain, min_interval, max_interval) triplet on 
line "
+            + String.valueOf(lineNo) + " of the config. file: " + line);
+        }
+      }
+    }
+  }
+
+  /**
+   * Strip a URL, leaving only the host name.
+   */
+  public static String getHostName(String url) throws URISyntaxException {
+    URI uri = new URI(url);
+    String domain = uri.getHost();
+    return domain;
+  }
+
+  /**
+   * Returns the max_interval for this URL, which might depend on the host.
+   * @param  url  the URL to be scheduled
+   * @param  defaultMaxInterval  the value to which to default
+   * if max_interval has not been configured for this host
+   */
+  public float getMaxInterval(Text url, float defaultMaxInterval){
+    if (hostSpecificMaxInterval.isEmpty()) {
+      return defaultMaxInterval;
+    }
+    String host;
+    try {
+      host = getHostName(url.toString());
+    } catch (URISyntaxException e){
+      return defaultMaxInterval;
+    }
+    if (hostSpecificMaxInterval.containsKey(host)){
+      return hostSpecificMaxInterval.get(host);
+    }
+    return defaultMaxInterval;
+  }
+
+  /**
+   * Returns the min_interval for this URL, which might depend on the host.
+   * @param  url  the URL to be scheduled
+   * @param  defaultMinInterval  the value to which to default
+   * if min_interval has not been configured for this host
+   */
+  public float getMinInterval(Text url, float defaultMinInterval){
+    if (hostSpecificMinInterval.isEmpty()) {
+      return defaultMinInterval;
+    }
+    String host;
+    try {
+      host = getHostName(url.toString());
+    } catch (URISyntaxException e){
+      return defaultMinInterval;
+    }
+    if (hostSpecificMinInterval.containsKey(host)){
+      return hostSpecificMinInterval.get(host);
+    }
+    return defaultMinInterval;
   }
 
   @Override
@@ -133,10 +279,15 @@ public class AdaptiveFetchSchedule extends 
AbstractFetchSchedule {
           interval = delta;
         refTime = fetchTime - Math.round(delta * SYNC_DELTA_RATE * 1000);
       }
-      if (interval < MIN_INTERVAL) {
-        interval = MIN_INTERVAL;
-      } else if (interval > MAX_INTERVAL) {
-        interval = MAX_INTERVAL;
+
+      // replace min_interval and max_interval with a domain-specific ones,
+      // if so configured.
+      float newMaxInterval = getMaxInterval(url, MAX_INTERVAL);
+      float newMinInterval = getMinInterval(url, MIN_INTERVAL);
+      if (interval < newMinInterval) {
+        interval = newMinInterval;
+      } else if (interval > newMaxInterval) {
+        interval = newMaxInterval;
       }
     }
 

Reply via email to