This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch NUTCH-3026
in repository https://gitbox.apache.org/repos/asf/nutch.git

commit 3a294709d7fc5e8324cca6ebd3de27164c154c23
Author: tballison <talli...@apache.org>
AuthorDate: Fri Nov 17 14:48:24 2023 -0500

    NUTCH-3026 -- first steps towards statusOnly option in IndexingJob
---
 .../org/apache/nutch/indexer/IndexerMapReduce.java | 140 ++++++++++++++++++++-
 src/java/org/apache/nutch/indexer/IndexingJob.java |  18 ++-
 2 files changed, 150 insertions(+), 8 deletions(-)

diff --git a/src/java/org/apache/nutch/indexer/IndexerMapReduce.java 
b/src/java/org/apache/nutch/indexer/IndexerMapReduce.java
index 1b8ff52eb..075c5965a 100644
--- a/src/java/org/apache/nutch/indexer/IndexerMapReduce.java
+++ b/src/java/org/apache/nutch/indexer/IndexerMapReduce.java
@@ -20,7 +20,10 @@ import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.Collection;
 import java.util.Locale;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
+import org.apache.nutch.parse.ParseStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.commons.codec.binary.Base64;
@@ -64,7 +67,7 @@ import org.apache.nutch.scoring.ScoringFilters;
  * </p>
  * <p>
  * See
- * {@link #initMRJob(Path, Path, Collection, Job, boolean)}
+ * {@link #initMRJob(Path, Path, Collection, Job, boolean, boolean)}
  * for details on the specific data structures and parameters required for
  * indexing.
  * </p>
@@ -78,6 +81,8 @@ public class IndexerMapReduce extends Configured {
   public static final String INDEXER_PARAMS = "indexer.additional.params";
   public static final String INDEXER_DELETE = "indexer.delete";
   public static final String INDEXER_NO_COMMIT = "indexer.nocommit";
+
+  public static final String INDEXER_STATUS_ONLY = "indexer.statusonly";
   public static final String INDEXER_DELETE_ROBOTS_NOINDEX = 
"indexer.delete.robots.noindex";
   public static final String INDEXER_DELETE_SKIPPED = 
"indexer.delete.skipped.by.indexingfilter";
   public static final String INDEXER_SKIP_NOTMODIFIED = 
"indexer.skip.notmodified";
@@ -203,6 +208,7 @@ public class IndexerMapReduce extends Configured {
     private boolean deleteRobotsNoIndex = false;
     private boolean deleteSkippedByIndexingFilter = false;
     private boolean base64 = false;
+    private boolean statusOnly = false;
     private IndexingFilters filters;
     private ScoringFilters scfilters;
 
@@ -226,6 +232,7 @@ public class IndexerMapReduce extends Configured {
           false);
       skip = conf.getBoolean(INDEXER_SKIP_NOTMODIFIED, false);
       base64 = conf.getBoolean(INDEXER_BINARY_AS_BASE64, false);
+      statusOnly = conf.getBoolean(INDEXER_STATUS_ONLY, false);
 
       normalize = conf.getBoolean(URL_NORMALIZING, false);
       filter = conf.getBoolean(URL_FILTERING, false);
@@ -251,7 +258,8 @@ public class IndexerMapReduce extends Configured {
       ParseText parseText = null;
 
       for (NutchWritable val : values) {
-        final Writable value = val.get(); // unwrap
+        final Writable value = val.get();// unwrap
+
         if (value instanceof Inlinks) {
           inlinks = (Inlinks) value;
         } else if (value instanceof CrawlDatum) {
@@ -295,6 +303,10 @@ public class IndexerMapReduce extends Configured {
           LOG.warn("Unrecognized type: {}", value.getClass());
         }
       }
+      if (statusOnly) {
+        reduceStatusOnly(key, context, inlinks, dbDatum, fetchDatum, content, 
parseData, parseText);
+        return;
+      }
 
       // Whether to delete GONE or REDIRECTS
       if (delete && fetchDatum != null) {
@@ -429,10 +441,130 @@ public class IndexerMapReduce extends Configured {
       NutchIndexAction action = new NutchIndexAction(doc, 
NutchIndexAction.ADD);
       context.write(key, action);
     }
+
+    private void reduceStatusOnly(Text key, Reducer.Context context, Inlinks 
inlinks, CrawlDatum dbDatum,
+                                  CrawlDatum fetchDatum, Content content,
+                                  ParseData parseData, ParseText parseText)
+            throws IOException, InterruptedException {
+
+      NutchDocument doc = new NutchDocument();
+      doc.add("id", key.toString());
+      updateDbDatum(dbDatum, doc);
+      updateFetchDatum(fetchDatum, doc);
+      updateContent(key, content, doc);
+      doc = updateParse(key, parseData, parseText, fetchDatum, inlinks, doc);
+      LOG.info("processing status for: {}", key.toString());
+
+      context.getCounter("IndexerStatus", "indexed (add/update)").increment(1);
+      NutchIndexAction action = new NutchIndexAction(doc, 
NutchIndexAction.ADD);
+      context.write(key, action);
+    }
+
+    private NutchDocument updateParse(Text key, ParseData parseData, ParseText 
parseText,
+                             CrawlDatum fetchDatum, Inlinks inlinks, 
NutchDocument doc) {
+      if (parseData == null) {
+        return doc;
+      }
+      ParseStatus status = parseData.getStatus();
+      String parseStatusMajorName = "UNKNOWN!";
+      if (status.getMajorCode() >= 0 &&
+              status.getMajorCode() < ParseStatus.majorCodes.length) {
+        parseStatusMajorName = ParseStatus.majorCodes[status.getMajorCode()];
+      }
+      if (status.getMessage() != null && status.getMessage().trim().length() > 
0) {
+        doc.add("nutch.parse.message", status.getMessage());
+      }
+      doc.add("nutch.parse.status", parseStatusMajorName);
+      //TODO: add minor status
+
+      // add segment, used to map from merged index back to segment files
+      doc.add("nutch.segment", 
parseData.getContentMeta().get(Nutch.SEGMENT_NAME_KEY));
+
+      // add digest, used by dedup
+      doc.add("nutch.digest", 
parseData.getContentMeta().get(Nutch.SIGNATURE_KEY));
+
+      if (parseData != null && parseText != null) {
+        final Parse parse = new ParseImpl(parseText, parseData);
+        try {
+          doc = filters.filter(doc, parse, key, fetchDatum, inlinks);
+          //remove the content
+          doc.removeField("content");
+          return doc;
+        } catch (IndexingException e) {
+          if (LOG.isWarnEnabled()) {
+            LOG.warn("Error filtering " + key + ": ", e);
+          }
+          //context.getCounter("IndexerStatus", "errors 
(IndexingFilter)").increment(1);
+        }
+      }
+      return doc;
+    }
+
+    private int getStatus(Metadata metadata) {
+      if (metadata.get("_response.headers_") == null) {
+        return -1;
+      }
+      String[] headers = metadata.get("_response.headers_").split("\r\n");
+      String firstHeader = headers[0];
+
+      Matcher m = 
Pattern.compile("\\A(\\d\\d\\d)\\Z").matcher(firstHeader.trim());
+      if (m.find()) {
+        return Integer.parseInt(m.group(1));
+      }
+      return -1;
+    }
+
+    private void updateContent(Text key, Content content, NutchDocument doc) {
+      if (content == null) {
+        return;
+      }
+      String contentType = content.getContentType();
+      doc.add("nutch.content.content-type", contentType);
+      if (content.getContent() != null) {
+        doc.add("nutch.content.length", 
Long.toString(content.getContent().length));
+      }
+      Metadata nutchMetadata = content.getMetadata();
+      int statusCode = getStatus(nutchMetadata);
+      if (statusCode > -1) {
+        doc.add("nutch.http.status", statusCode);
+      }
+      /*
+      //TODO -- do we want to add all of this?
+      //Dangerous to pass through http headers without prefixing them
+      //or sanitizing them?
+      if (nutchMetadata != null) {
+
+        for (String n : nutchMetadata.names()) {
+          String[] vals = nutchMetadata.getValues(n);
+          if (vals.length == 1) {
+            doc.add(n, vals[0]);
+          } else {
+            doc.add(n, nutchMetadata.getValues(n));
+          }
+        }
+      }*/
+    }
+
+    private void updateFetchDatum(CrawlDatum fetchDatum, NutchDocument doc) {
+      if (fetchDatum == null) {
+        return;
+      }
+      String fetchStatus = fetchDatum.getStatusName(fetchDatum.getStatus());
+      doc.add("nutch.fetch.status", fetchStatus);
+    }
+
+    private void updateDbDatum(CrawlDatum dbDatum, NutchDocument doc) {
+      if (dbDatum == null) {
+        return;
+      }
+      String dbStatus = dbDatum.getStatusName(dbDatum.getStatus());
+      doc.add("nutch.db.status", dbStatus);
+    }
+
   }
 
   public static void initMRJob(Path crawlDb, Path linkDb,
-      Collection<Path> segments, Job job, boolean addBinaryContent) throws 
IOException{
+      Collection<Path> segments, Job job, boolean addBinaryContent, boolean 
statusOnly) throws IOException{
 
     Configuration conf = job.getConfiguration();
 
@@ -463,7 +595,7 @@ public class IndexerMapReduce extends Configured {
       FileInputFormat.addInputPath(job, new Path(segment, ParseData.DIR_NAME));
       FileInputFormat.addInputPath(job, new Path(segment, ParseText.DIR_NAME));
 
-      if (addBinaryContent) {
+      if (addBinaryContent || statusOnly) {
         FileInputFormat.addInputPath(job, new Path(segment, Content.DIR_NAME));
       }
     }
diff --git a/src/java/org/apache/nutch/indexer/IndexingJob.java 
b/src/java/org/apache/nutch/indexer/IndexingJob.java
index c3ddb4ae9..786f7f406 100644
--- a/src/java/org/apache/nutch/indexer/IndexingJob.java
+++ b/src/java/org/apache/nutch/indexer/IndexingJob.java
@@ -96,13 +96,14 @@ public class IndexingJob extends NutchTool implements Tool {
       boolean filter, boolean normalize, boolean addBinaryContent)
       throws IOException, InterruptedException, ClassNotFoundException {
     index(crawlDb, linkDb, segments, noCommit, deleteGone, params, false,
-        false, false, false);
+        false, false, false, false);
   }
 
   public void index(Path crawlDb, Path linkDb, List<Path> segments,
       boolean noCommit, boolean deleteGone, String params,
       boolean filter, boolean normalize, boolean addBinaryContent,
-      boolean base64) throws IOException, InterruptedException, 
ClassNotFoundException {
+      boolean base64, boolean statusOnly) throws IOException, 
InterruptedException,
+          ClassNotFoundException {
 
     StopWatch stopWatch = new StopWatch();
     stopWatch.start();
@@ -116,6 +117,7 @@ public class IndexingJob extends NutchTool implements Tool {
     LOG.info("Indexer: deleting gone documents: {}", deleteGone);
     LOG.info("Indexer: URL filtering: {}", filter);
     LOG.info("Indexer: URL normalizing: {}", normalize);
+    LOG.info("Indexer: status only: {}", statusOnly);
     if (addBinaryContent) {
       if (base64) {
         LOG.info("Indexer: adding binary content as Base64");
@@ -124,13 +126,15 @@ public class IndexingJob extends NutchTool implements 
Tool {
       }
     }
 
-    IndexerMapReduce.initMRJob(crawlDb, linkDb, segments, job, 
addBinaryContent);
+    IndexerMapReduce.initMRJob(crawlDb, linkDb, segments, job,
+            addBinaryContent, statusOnly);
 
     conf.setBoolean(IndexerMapReduce.INDEXER_DELETE, deleteGone);
     conf.setBoolean(IndexerMapReduce.URL_FILTERING, filter);
     conf.setBoolean(IndexerMapReduce.URL_NORMALIZING, normalize);
     conf.setBoolean(IndexerMapReduce.INDEXER_BINARY_AS_BASE64, base64);
     conf.setBoolean(IndexerMapReduce.INDEXER_NO_COMMIT, noCommit);
+    conf.setBoolean(IndexerMapReduce.INDEXER_STATUS_ONLY, statusOnly);
 
     if (params != null) {
       conf.set(IndexerMapReduce.INDEXER_PARAMS, params);
@@ -209,6 +213,8 @@ public class IndexingJob extends NutchTool implements Tool {
     System.err.println(
         "\t-addBinaryContent\tindex raw/binary content in field 
`binaryContent`");
     System.err.println("\t-base64   \tuse Base64 encoding for binary content");
+    System.err.println("\t-statusOnly   \tindex the status of all urls in the 
crawldb and skip " +
+            "the content");
     System.err.println("");
   }
 
@@ -233,6 +239,7 @@ public class IndexingJob extends NutchTool implements Tool {
     boolean normalize = false;
     boolean addBinaryContent = false;
     boolean base64 = false;
+    boolean statusOnly = false;
 
     for (int i = 0; i < args.length; i++) {
       FileSystem fs = null;
@@ -272,6 +279,8 @@ public class IndexingJob extends NutchTool implements Tool {
          * given
          */
         crawlDb = new Path(args[i]);
+      } else if (args[i].equals("-statusOnly")) {
+        statusOnly = true;
       } else {
         // remaining arguments are segments
         dir = new Path(args[i]);
@@ -289,7 +298,8 @@ public class IndexingJob extends NutchTool implements Tool {
     }
 
     try {
-      index(crawlDb, linkDb, segments, noCommit, deleteGone, params, filter, 
normalize, addBinaryContent, base64);
+      index(crawlDb, linkDb, segments, noCommit, deleteGone, params, filter, 
normalize,
+              addBinaryContent, base64, statusOnly);
       return 0;
     } catch (final Exception e) {
       LOG.error("Indexer: {}", StringUtils.stringifyException(e));

Reply via email to