This is an automated email from the ASF dual-hosted git repository. snagel pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nutch.git
The following commit(s) were added to refs/heads/master by this push: new 0347527 NUTCH-2279 LinkRank fails when using Hadoop MR output compression - read output directory of link counter job to determine output file name (fail if there is none or more than one file) - determine output codec and use it to read the output new 087aea6 Merge pull request #478 from sebastian-nagel/NUTCH-2279-linkrank-output-compression 0347527 is described below commit 03475276204cb0a31f1f5f0b6a547d3c92c6a799 Author: Sebastian Nagel <sna...@apache.org> AuthorDate: Mon Sep 30 17:49:39 2019 +0200 NUTCH-2279 LinkRank fails when using Hadoop MR output compression - read output directory of link counter job to determine output file name (fail if there is none or more than one file) - determine output codec and use it to read the output --- .../apache/nutch/scoring/webgraph/LinkRank.java | 38 ++++++++++++++++++++-- 1 file changed, 35 insertions(+), 3 deletions(-) diff --git a/src/java/org/apache/nutch/scoring/webgraph/LinkRank.java b/src/java/org/apache/nutch/scoring/webgraph/LinkRank.java index 6829927..b6bfa98 100644 --- a/src/java/org/apache/nutch/scoring/webgraph/LinkRank.java +++ b/src/java/org/apache/nutch/scoring/webgraph/LinkRank.java @@ -18,6 +18,7 @@ package org.apache.nutch.scoring.webgraph; import java.io.BufferedReader; import java.io.IOException; +import java.io.InputStream; import java.io.InputStreamReader; import java.lang.invoke.MethodHandles; import java.text.SimpleDateFormat; @@ -39,6 +40,7 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; @@ -46,6 +48,8 @@ import org.apache.hadoop.io.ObjectWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.Job; @@ -128,14 +132,42 @@ public class LinkRank extends Configured implements Tool { // read the first (and only) line from the file which should be the // number of links in the web graph - LOG.info("Reading numlinks temp file"); - FSDataInputStream readLinks = fs.open(new Path(numLinksPath, "part-r-00000")); - BufferedReader buffer = new BufferedReader(new InputStreamReader(readLinks)); + FileStatus[] numLinksFiles = fs.listStatus(numLinksPath); + if (numLinksFiles.length == 0) { + throw new IOException("Failed to read numlinks temp file: " + + " no file found in " + numLinksPath); + } else if (numLinksFiles.length > 1) { + throw new IOException("Failed to read numlinks temp file: " + + " expected only one file but found " + numLinksFiles.length + + " files in folder " + numLinksPath); + } + Path numLinksFile = numLinksFiles[0].getPath(); + LOG.info("Reading numlinks temp file {}", numLinksFile); + FSDataInputStream readLinks = fs.open(numLinksFile); + CompressionCodecFactory cf = new CompressionCodecFactory(conf); + CompressionCodec codec = cf.getCodec(numLinksFiles[0].getPath()); + InputStream streamLinks; + if (codec == null) { + LOG.debug("No compression codec found for {}, trying uncompressed", + numLinksFile); + streamLinks = readLinks; + } else { + LOG.info("Compression codec of numlinks temp file: {}", + codec.getDefaultExtension()); + readLinks.seek(0); + streamLinks = codec.createInputStream(readLinks); + } + BufferedReader buffer = new BufferedReader( + new InputStreamReader(streamLinks)); + String numLinksLine = buffer.readLine(); readLinks.close(); // check if there are links to process, if none, webgraph might be empty if (numLinksLine == null || numLinksLine.length() == 0) { + LOG.error( + "Failed to determine number of links because of empty line in input {}", + numLinksFile); fs.delete(numLinksPath, true); throw new IOException("No links to process, is the webgraph empty?"); }