This is an automated email from the ASF dual-hosted git repository.

snagel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nutch.git


The following commit(s) were added to refs/heads/master by this push:
     new 86b893a71 NUTCH-3079 Dumping a segment fails unless it has been 
fetched and parsed
86b893a71 is described below

commit 86b893a717260eb5078120387fa223eb5cc88794
Author: Sebastian Nagel <[email protected]>
AuthorDate: Sun Oct 27 21:01:58 2024 +0100

    NUTCH-3079 Dumping a segment fails unless it has been fetched and parsed
    
    SegmentReaders dump and get tools do now check whether a segment
    subdirectory exists before adding it as input. A warning is shown
    if the subdirectory does not exist but is not excluded via one
    of the general options (-nogenerate, etc.)
---
 .../org/apache/nutch/segment/SegmentReader.java    | 96 ++++++++++++++++------
 1 file changed, 69 insertions(+), 27 deletions(-)

diff --git a/src/java/org/apache/nutch/segment/SegmentReader.java 
b/src/java/org/apache/nutch/segment/SegmentReader.java
index bef980060..b2073f6fc 100644
--- a/src/java/org/apache/nutch/segment/SegmentReader.java
+++ b/src/java/org/apache/nutch/segment/SegmentReader.java
@@ -90,7 +90,7 @@ public class SegmentReader extends Configured implements Tool 
{
 
   public static class InputCompatMapper extends
       Mapper<WritableComparable<?>, Writable, Text, NutchWritable> {
-    
+
     private Text newKey = new Text();
 
     @Override
@@ -195,6 +195,28 @@ public class SegmentReader extends Configured implements 
Tool {
     }
   }
 
+  private static boolean segmSubdirExists(Configuration conf, Path segment,
+      String subDir) throws IOException {
+    Path segmSubPath = new Path(segment, subDir);
+    boolean exists = segmSubPath.getFileSystem(conf).exists(segmSubPath);
+    if (!exists) {
+      LOG.warn("Segment subdirectory {} does not exist in {}!", subDir,
+          segment);
+    }
+    return exists;
+  }
+
+  private static void addSegmSubDirIfExists(List<Path> inputDirs, 
Configuration conf,
+      Path segment, String subDir) throws IOException {
+    Path segmSubPath = new Path(segment, subDir);
+    if (segmSubPath.getFileSystem(conf).exists(segmSubPath)) {
+      inputDirs.add(segmSubPath);
+    } else {
+      LOG.warn("Segment subdirectory {} does not exist in {} - skipping!", 
subDir,
+          segment);
+    }
+  }
+
   public void dump(Path segment, Path output) throws IOException,
       InterruptedException, ClassNotFoundException {
 
@@ -203,21 +225,36 @@ public class SegmentReader extends Configured implements 
Tool {
     Job job = Job.getInstance(getConf(), "Nutch SegmentReader: " + segment);
     Configuration conf = job.getConfiguration();
 
-    if (ge)
-      FileInputFormat.addInputPath(job, new Path(segment,
-          CrawlDatum.GENERATE_DIR_NAME));
-    if (fe)
-      FileInputFormat.addInputPath(job, new Path(segment,
-          CrawlDatum.FETCH_DIR_NAME));
-    if (pa)
-      FileInputFormat.addInputPath(job, new Path(segment,
-          CrawlDatum.PARSE_DIR_NAME));
-    if (co)
-      FileInputFormat.addInputPath(job, new Path(segment, Content.DIR_NAME));
-    if (pd)
-      FileInputFormat.addInputPath(job, new Path(segment, ParseData.DIR_NAME));
-    if (pt)
-      FileInputFormat.addInputPath(job, new Path(segment, ParseText.DIR_NAME));
+    List<Path> inputDirs = new ArrayList<>();
+    if (ge) {
+      addSegmSubDirIfExists(inputDirs, conf, segment,
+          CrawlDatum.GENERATE_DIR_NAME);
+    }
+    if (fe) {
+      addSegmSubDirIfExists(inputDirs, conf, segment,
+          CrawlDatum.FETCH_DIR_NAME);
+    }
+    if (pa) {
+      addSegmSubDirIfExists(inputDirs, conf, segment,
+          CrawlDatum.PARSE_DIR_NAME);
+    }
+    if (co) {
+      addSegmSubDirIfExists(inputDirs, conf, segment, Content.DIR_NAME);
+    }
+    if (pd) {
+      addSegmSubDirIfExists(inputDirs, conf, segment, ParseData.DIR_NAME);
+    }
+    if (pt) {
+      addSegmSubDirIfExists(inputDirs, conf, segment, ParseText.DIR_NAME);
+    }
+    if (inputDirs.isEmpty()) {
+      String msg = "No segment subdirectories defined as input";
+      LOG.error(msg);
+      throw new RuntimeException(msg);
+    }
+    for (Path p : inputDirs) {
+      FileInputFormat.addInputPath(job, p);
+    }
 
     job.setInputFormatClass(SequenceFileInputFormat.class);
     job.setMapperClass(InputCompatMapper.class);
@@ -243,7 +280,7 @@ public class SegmentReader extends Configured implements 
Tool {
       }
     } catch (IOException | InterruptedException | ClassNotFoundException e ){
       LOG.error(StringUtils.stringifyException(e));
-      throw e; 
+      throw e;
     }
 
     // concatenate the output
@@ -307,7 +344,7 @@ public class SegmentReader extends Configured implements 
Tool {
       final Map<String, List<Writable>> results) throws Exception {
     LOG.info("SegmentReader: get '{}'", key);
     ArrayList<Thread> threads = new ArrayList<>();
-    if (co)
+    if (co && segmSubdirExists(getConf(), segment, Content.DIR_NAME))
       threads.add(new Thread() {
         @Override
         public void run() {
@@ -320,7 +357,7 @@ public class SegmentReader extends Configured implements 
Tool {
           }
         }
       });
-    if (fe)
+    if (fe && segmSubdirExists(getConf(), segment, CrawlDatum.FETCH_DIR_NAME))
       threads.add(new Thread() {
         @Override
         public void run() {
@@ -333,7 +370,8 @@ public class SegmentReader extends Configured implements 
Tool {
           }
         }
       });
-    if (ge)
+    if (ge
+        && segmSubdirExists(getConf(), segment, CrawlDatum.GENERATE_DIR_NAME))
       threads.add(new Thread() {
         @Override
         public void run() {
@@ -346,7 +384,7 @@ public class SegmentReader extends Configured implements 
Tool {
           }
         }
       });
-    if (pa)
+    if (pa && segmSubdirExists(getConf(), segment, CrawlDatum.PARSE_DIR_NAME))
       threads.add(new Thread() {
         @Override
         public void run() {
@@ -359,7 +397,7 @@ public class SegmentReader extends Configured implements 
Tool {
           }
         }
       });
-    if (pd)
+    if (pd && segmSubdirExists(getConf(), segment, ParseData.DIR_NAME))
       threads.add(new Thread() {
         @Override
         public void run() {
@@ -372,7 +410,7 @@ public class SegmentReader extends Configured implements 
Tool {
           }
         }
       });
-    if (pt)
+    if (pt && segmSubdirExists(getConf(), segment, ParseText.DIR_NAME))
       threads.add(new Thread() {
         @Override
         public void run() {
@@ -386,6 +424,10 @@ public class SegmentReader extends Configured implements 
Tool {
         }
       });
     Iterator<Thread> it = threads.iterator();
+    if (!it.hasNext()) {
+      LOG.error("No segment subdirectories specified as input!");
+      return;
+    }
     while (it.hasNext())
       it.next().start();
     int cnt;
@@ -476,7 +518,7 @@ public class SegmentReader extends Configured implements 
Tool {
    * {@link Metadata#CONTENT_ENCODING} then fallback
    * {@link java.nio.charset.StandardCharsets#UTF_8}
    * @param parseMeta a populated {@link Metadata}
-   * @return {@link Charset} 
+   * @return {@link Charset}
    */
   public static Charset getCharset(Metadata parseMeta) {
     Charset cs = StandardCharsets.UTF_8;
@@ -548,7 +590,7 @@ public class SegmentReader extends Configured implements 
Tool {
     Text key = new Text();
     CrawlDatum val = new CrawlDatum();
     FileSystem fs = segment.getFileSystem(getConf());
-    
+
     if (ge) {
       SequenceFile.Reader[] readers = SegmentReaderUtil.getReaders(
           new Path(segment, CrawlDatum.GENERATE_DIR_NAME), getConf());
@@ -559,7 +601,7 @@ public class SegmentReader extends Configured implements 
Tool {
       }
       stats.generated = cnt;
     }
-    
+
     if (fe) {
       Path fetchDir = new Path(segment, CrawlDatum.FETCH_DIR_NAME);
       if (fs.exists(fetchDir) && fs.getFileStatus(fetchDir).isDirectory()) {
@@ -584,7 +626,7 @@ public class SegmentReader extends Configured implements 
Tool {
         stats.fetched = cnt;
       }
     }
-    
+
     if (pd) {
       Path parseDir = new Path(segment, ParseData.DIR_NAME);
       if (fs.exists(parseDir) && fs.getFileStatus(parseDir).isDirectory()) {

Reply via email to