This is an automated email from the ASF dual-hosted git repository.

snagel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nutch.git


The following commit(s) were added to refs/heads/master by this push:
     new 0347527  NUTCH-2279 LinkRank fails when using Hadoop MR output 
compression - read output directory of link counter job to determine output   
file name (fail if there is none or more than one file) - determine output 
codec and use it to read the output
     new 087aea6  Merge pull request #478 from 
sebastian-nagel/NUTCH-2279-linkrank-output-compression
0347527 is described below

commit 03475276204cb0a31f1f5f0b6a547d3c92c6a799
Author: Sebastian Nagel <sna...@apache.org>
AuthorDate: Mon Sep 30 17:49:39 2019 +0200

    NUTCH-2279 LinkRank fails when using Hadoop MR output compression
    - read output directory of link counter job to determine output
      file name (fail if there is none or more than one file)
    - determine output codec and use it to read the output
---
 .../apache/nutch/scoring/webgraph/LinkRank.java    | 38 ++++++++++++++++++++--
 1 file changed, 35 insertions(+), 3 deletions(-)

diff --git a/src/java/org/apache/nutch/scoring/webgraph/LinkRank.java 
b/src/java/org/apache/nutch/scoring/webgraph/LinkRank.java
index 6829927..b6bfa98 100644
--- a/src/java/org/apache/nutch/scoring/webgraph/LinkRank.java
+++ b/src/java/org/apache/nutch/scoring/webgraph/LinkRank.java
@@ -18,6 +18,7 @@ package org.apache.nutch.scoring.webgraph;
 
 import java.io.BufferedReader;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.lang.invoke.MethodHandles;
 import java.text.SimpleDateFormat;
@@ -39,6 +40,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
@@ -46,6 +48,8 @@ import org.apache.hadoop.io.ObjectWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.Job;
@@ -128,14 +132,42 @@ public class LinkRank extends Configured implements Tool {
 
     // read the first (and only) line from the file which should be the
     // number of links in the web graph
-    LOG.info("Reading numlinks temp file");
-    FSDataInputStream readLinks = fs.open(new Path(numLinksPath, 
"part-r-00000"));
-    BufferedReader buffer = new BufferedReader(new 
InputStreamReader(readLinks));
+    FileStatus[] numLinksFiles = fs.listStatus(numLinksPath);
+    if (numLinksFiles.length == 0) {
+      throw new IOException("Failed to read numlinks temp file: "
+          + " no file found in " + numLinksPath);
+    } else if (numLinksFiles.length > 1) {
+      throw new IOException("Failed to read numlinks temp file: "
+          + " expected only one file but found " + numLinksFiles.length
+          + " files in folder " + numLinksPath);
+    }
+    Path numLinksFile = numLinksFiles[0].getPath();
+    LOG.info("Reading numlinks temp file {}", numLinksFile);
+    FSDataInputStream readLinks = fs.open(numLinksFile);
+    CompressionCodecFactory cf = new CompressionCodecFactory(conf);
+    CompressionCodec codec = cf.getCodec(numLinksFiles[0].getPath());
+    InputStream streamLinks;
+    if (codec == null) {
+      LOG.debug("No compression codec found for {}, trying uncompressed",
+          numLinksFile);
+      streamLinks = readLinks;
+    } else {
+      LOG.info("Compression codec of numlinks temp file: {}",
+          codec.getDefaultExtension());
+      readLinks.seek(0);
+      streamLinks = codec.createInputStream(readLinks);
+    }
+    BufferedReader buffer = new BufferedReader(
+        new InputStreamReader(streamLinks));
+
     String numLinksLine = buffer.readLine();
     readLinks.close();
 
     // check if there are links to process, if none, webgraph might be empty
     if (numLinksLine == null || numLinksLine.length() == 0) {
+      LOG.error(
+          "Failed to determine number of links because of empty line in input 
{}",
+          numLinksFile);
       fs.delete(numLinksPath, true);
       throw new IOException("No links to process, is the webgraph empty?");
     }

Reply via email to