This is an automated email from the ASF dual-hosted git repository.
snagel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nutch.git
The following commit(s) were added to refs/heads/master by this push:
new 86b893a71 NUTCH-3079 Dumping a segment fails unless it has been
fetched and parsed
86b893a71 is described below
commit 86b893a717260eb5078120387fa223eb5cc88794
Author: Sebastian Nagel <[email protected]>
AuthorDate: Sun Oct 27 21:01:58 2024 +0100
NUTCH-3079 Dumping a segment fails unless it has been fetched and parsed
SegmentReaders dump and get tools do now check whether a segment
subdirectory exists before adding it as input. A warning is shown
if the subdirectory does not exist but is not excluded via one
of the general options (-nogenerate, etc.)
---
.../org/apache/nutch/segment/SegmentReader.java | 96 ++++++++++++++++------
1 file changed, 69 insertions(+), 27 deletions(-)
diff --git a/src/java/org/apache/nutch/segment/SegmentReader.java
b/src/java/org/apache/nutch/segment/SegmentReader.java
index bef980060..b2073f6fc 100644
--- a/src/java/org/apache/nutch/segment/SegmentReader.java
+++ b/src/java/org/apache/nutch/segment/SegmentReader.java
@@ -90,7 +90,7 @@ public class SegmentReader extends Configured implements Tool
{
public static class InputCompatMapper extends
Mapper<WritableComparable<?>, Writable, Text, NutchWritable> {
-
+
private Text newKey = new Text();
@Override
@@ -195,6 +195,28 @@ public class SegmentReader extends Configured implements
Tool {
}
}
+ private static boolean segmSubdirExists(Configuration conf, Path segment,
+ String subDir) throws IOException {
+ Path segmSubPath = new Path(segment, subDir);
+ boolean exists = segmSubPath.getFileSystem(conf).exists(segmSubPath);
+ if (!exists) {
+ LOG.warn("Segment subdirectory {} does not exist in {}!", subDir,
+ segment);
+ }
+ return exists;
+ }
+
+ private static void addSegmSubDirIfExists(List<Path> inputDirs,
Configuration conf,
+ Path segment, String subDir) throws IOException {
+ Path segmSubPath = new Path(segment, subDir);
+ if (segmSubPath.getFileSystem(conf).exists(segmSubPath)) {
+ inputDirs.add(segmSubPath);
+ } else {
+ LOG.warn("Segment subdirectory {} does not exist in {} - skipping!",
subDir,
+ segment);
+ }
+ }
+
public void dump(Path segment, Path output) throws IOException,
InterruptedException, ClassNotFoundException {
@@ -203,21 +225,36 @@ public class SegmentReader extends Configured implements
Tool {
Job job = Job.getInstance(getConf(), "Nutch SegmentReader: " + segment);
Configuration conf = job.getConfiguration();
- if (ge)
- FileInputFormat.addInputPath(job, new Path(segment,
- CrawlDatum.GENERATE_DIR_NAME));
- if (fe)
- FileInputFormat.addInputPath(job, new Path(segment,
- CrawlDatum.FETCH_DIR_NAME));
- if (pa)
- FileInputFormat.addInputPath(job, new Path(segment,
- CrawlDatum.PARSE_DIR_NAME));
- if (co)
- FileInputFormat.addInputPath(job, new Path(segment, Content.DIR_NAME));
- if (pd)
- FileInputFormat.addInputPath(job, new Path(segment, ParseData.DIR_NAME));
- if (pt)
- FileInputFormat.addInputPath(job, new Path(segment, ParseText.DIR_NAME));
+ List<Path> inputDirs = new ArrayList<>();
+ if (ge) {
+ addSegmSubDirIfExists(inputDirs, conf, segment,
+ CrawlDatum.GENERATE_DIR_NAME);
+ }
+ if (fe) {
+ addSegmSubDirIfExists(inputDirs, conf, segment,
+ CrawlDatum.FETCH_DIR_NAME);
+ }
+ if (pa) {
+ addSegmSubDirIfExists(inputDirs, conf, segment,
+ CrawlDatum.PARSE_DIR_NAME);
+ }
+ if (co) {
+ addSegmSubDirIfExists(inputDirs, conf, segment, Content.DIR_NAME);
+ }
+ if (pd) {
+ addSegmSubDirIfExists(inputDirs, conf, segment, ParseData.DIR_NAME);
+ }
+ if (pt) {
+ addSegmSubDirIfExists(inputDirs, conf, segment, ParseText.DIR_NAME);
+ }
+ if (inputDirs.isEmpty()) {
+ String msg = "No segment subdirectories defined as input";
+ LOG.error(msg);
+ throw new RuntimeException(msg);
+ }
+ for (Path p : inputDirs) {
+ FileInputFormat.addInputPath(job, p);
+ }
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setMapperClass(InputCompatMapper.class);
@@ -243,7 +280,7 @@ public class SegmentReader extends Configured implements
Tool {
}
} catch (IOException | InterruptedException | ClassNotFoundException e ){
LOG.error(StringUtils.stringifyException(e));
- throw e;
+ throw e;
}
// concatenate the output
@@ -307,7 +344,7 @@ public class SegmentReader extends Configured implements
Tool {
final Map<String, List<Writable>> results) throws Exception {
LOG.info("SegmentReader: get '{}'", key);
ArrayList<Thread> threads = new ArrayList<>();
- if (co)
+ if (co && segmSubdirExists(getConf(), segment, Content.DIR_NAME))
threads.add(new Thread() {
@Override
public void run() {
@@ -320,7 +357,7 @@ public class SegmentReader extends Configured implements
Tool {
}
}
});
- if (fe)
+ if (fe && segmSubdirExists(getConf(), segment, CrawlDatum.FETCH_DIR_NAME))
threads.add(new Thread() {
@Override
public void run() {
@@ -333,7 +370,8 @@ public class SegmentReader extends Configured implements
Tool {
}
}
});
- if (ge)
+ if (ge
+ && segmSubdirExists(getConf(), segment, CrawlDatum.GENERATE_DIR_NAME))
threads.add(new Thread() {
@Override
public void run() {
@@ -346,7 +384,7 @@ public class SegmentReader extends Configured implements
Tool {
}
}
});
- if (pa)
+ if (pa && segmSubdirExists(getConf(), segment, CrawlDatum.PARSE_DIR_NAME))
threads.add(new Thread() {
@Override
public void run() {
@@ -359,7 +397,7 @@ public class SegmentReader extends Configured implements
Tool {
}
}
});
- if (pd)
+ if (pd && segmSubdirExists(getConf(), segment, ParseData.DIR_NAME))
threads.add(new Thread() {
@Override
public void run() {
@@ -372,7 +410,7 @@ public class SegmentReader extends Configured implements
Tool {
}
}
});
- if (pt)
+ if (pt && segmSubdirExists(getConf(), segment, ParseText.DIR_NAME))
threads.add(new Thread() {
@Override
public void run() {
@@ -386,6 +424,10 @@ public class SegmentReader extends Configured implements
Tool {
}
});
Iterator<Thread> it = threads.iterator();
+ if (!it.hasNext()) {
+ LOG.error("No segment subdirectories specified as input!");
+ return;
+ }
while (it.hasNext())
it.next().start();
int cnt;
@@ -476,7 +518,7 @@ public class SegmentReader extends Configured implements
Tool {
* {@link Metadata#CONTENT_ENCODING} then fallback
* {@link java.nio.charset.StandardCharsets#UTF_8}
* @param parseMeta a populated {@link Metadata}
- * @return {@link Charset}
+ * @return {@link Charset}
*/
public static Charset getCharset(Metadata parseMeta) {
Charset cs = StandardCharsets.UTF_8;
@@ -548,7 +590,7 @@ public class SegmentReader extends Configured implements
Tool {
Text key = new Text();
CrawlDatum val = new CrawlDatum();
FileSystem fs = segment.getFileSystem(getConf());
-
+
if (ge) {
SequenceFile.Reader[] readers = SegmentReaderUtil.getReaders(
new Path(segment, CrawlDatum.GENERATE_DIR_NAME), getConf());
@@ -559,7 +601,7 @@ public class SegmentReader extends Configured implements
Tool {
}
stats.generated = cnt;
}
-
+
if (fe) {
Path fetchDir = new Path(segment, CrawlDatum.FETCH_DIR_NAME);
if (fs.exists(fetchDir) && fs.getFileStatus(fetchDir).isDirectory()) {
@@ -584,7 +626,7 @@ public class SegmentReader extends Configured implements
Tool {
stats.fetched = cnt;
}
}
-
+
if (pd) {
Path parseDir = new Path(segment, ParseData.DIR_NAME);
if (fs.exists(parseDir) && fs.getFileStatus(parseDir).isDirectory()) {