[ 
https://issues.apache.org/jira/browse/NUTCH-2569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16453860#comment-16453860
 ] 

ASF GitHub Bot commented on NUTCH-2569:
---------------------------------------

sebastian-nagel closed pull request #322: NUTCH-2569 ClassNotFoundException 
when running in (pseudo-)distributed mode
URL: https://github.com/apache/nutch/pull/322
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/src/java/org/apache/nutch/crawl/CrawlDb.java 
b/src/java/org/apache/nutch/crawl/CrawlDb.java
index 7af3b6b19..333a7b68b 100644
--- a/src/java/org/apache/nutch/crawl/CrawlDb.java
+++ b/src/java/org/apache/nutch/crawl/CrawlDb.java
@@ -181,6 +181,7 @@ public static Job createJob(Configuration config, Path 
crawlDb)
 
     job.setMapperClass(CrawlDbFilter.class);
     job.setReducerClass(CrawlDbReducer.class);
+    job.setJarByClass(CrawlDb.class);
 
     FileOutputFormat.setOutputPath(job, newCrawlDb);
     job.setOutputFormatClass(MapFileOutputFormat.class);
diff --git a/src/java/org/apache/nutch/crawl/CrawlDbMerger.java 
b/src/java/org/apache/nutch/crawl/CrawlDbMerger.java
index 3a83e416e..4d9ce0d90 100644
--- a/src/java/org/apache/nutch/crawl/CrawlDbMerger.java
+++ b/src/java/org/apache/nutch/crawl/CrawlDbMerger.java
@@ -77,7 +77,8 @@
     public void close() throws IOException {
     }
 
-    public void setup(Reducer.Context context) {
+    public void setup(
+        Reducer<Text, CrawlDatum, Text, CrawlDatum>.Context context) {
       Configuration conf = context.getConfiguration();
       schedule = FetchScheduleFactory.getFetchSchedule(conf);
     }
@@ -179,6 +180,7 @@ public static Job createMergeJob(Configuration conf, Path 
output,
 
     job.setInputFormatClass(SequenceFileInputFormat.class);
 
+    job.setJarByClass(CrawlDbMerger.class);
     job.setMapperClass(CrawlDbFilter.class);
     conf.setBoolean(CrawlDbFilter.URL_FILTERING, filter);
     conf.setBoolean(CrawlDbFilter.URL_NORMALIZING, normalize);
diff --git a/src/java/org/apache/nutch/crawl/CrawlDbReader.java 
b/src/java/org/apache/nutch/crawl/CrawlDbReader.java
index 87bf58525..7e00ece5b 100644
--- a/src/java/org/apache/nutch/crawl/CrawlDbReader.java
+++ b/src/java/org/apache/nutch/crawl/CrawlDbReader.java
@@ -390,6 +390,7 @@ public void close() {
          FileInputFormat.addInputPath(job, new Path(crawlDb, 
CrawlDb.CURRENT_NAME));
          job.setInputFormatClass(SequenceFileInputFormat.class);
 
+         job.setJarByClass(CrawlDbReader.class);
          job.setMapperClass(CrawlDbStatMapper.class);
          job.setCombinerClass(CrawlDbStatReducer.class);
          job.setReducerClass(CrawlDbStatReducer.class);
@@ -690,6 +691,7 @@ public void processDumpJob(String crawlDb, String output,
     job.setMapperClass(CrawlDbDumpMapper.class);
     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(CrawlDatum.class);
+    job.setJarByClass(CrawlDbReader.class);
 
     try {
       boolean success = job.waitForCompletion(true);
@@ -794,6 +796,8 @@ public void processTopNJob(String crawlDb, long topN, float 
min,
     job.setJobName("topN prepare " + crawlDb);
     FileInputFormat.addInputPath(job, new Path(crawlDb, CrawlDb.CURRENT_NAME));
     job.setInputFormatClass(SequenceFileInputFormat.class);
+
+    job.setJarByClass(CrawlDbReader.class);
     job.setMapperClass(CrawlDbTopNMapper.class);
     job.setReducerClass(Reducer.class);
 
@@ -832,6 +836,7 @@ public void processTopNJob(String crawlDb, long topN, float 
min,
     job.setInputFormatClass(SequenceFileInputFormat.class);
     job.setMapperClass(Mapper.class);
     job.setReducerClass(CrawlDbTopNReducer.class);
+    job.setJarByClass(CrawlDbReader.class);
 
     FileOutputFormat.setOutputPath(job, outFolder);
     job.setOutputFormatClass(TextOutputFormat.class);
diff --git a/src/java/org/apache/nutch/crawl/DeduplicationJob.java 
b/src/java/org/apache/nutch/crawl/DeduplicationJob.java
index 555f9e2eb..eaeb83581 100644
--- a/src/java/org/apache/nutch/crawl/DeduplicationJob.java
+++ b/src/java/org/apache/nutch/crawl/DeduplicationJob.java
@@ -296,6 +296,7 @@ public int run(String[] args) throws IOException {
     job.setJobName("Deduplication on " + crawldb);
     conf.set(DEDUPLICATION_GROUP_MODE, group);
     conf.set(DEDUPLICATION_COMPARE_ORDER, compareOrder);
+    job.setJarByClass(DeduplicationJob.class);
 
     FileInputFormat.addInputPath(job, new Path(crawldb, CrawlDb.CURRENT_NAME));
     job.setInputFormatClass(SequenceFileInputFormat.class);
diff --git a/src/java/org/apache/nutch/crawl/Generator.java 
b/src/java/org/apache/nutch/crawl/Generator.java
index a3ef91c89..9c22ee228 100644
--- a/src/java/org/apache/nutch/crawl/Generator.java
+++ b/src/java/org/apache/nutch/crawl/Generator.java
@@ -848,10 +848,9 @@ public Generator(Configuration conf) {
       }
       FileInputFormat.addInputPath(job, new Path(dbDir, CrawlDb.CURRENT_NAME));
       job.setInputFormatClass(SequenceFileInputFormat.class);
-      job.setJarByClass(CrawlDbUpdater.class);
       job.setMapperClass(CrawlDbUpdater.CrawlDbUpdateMapper.class);
       job.setReducerClass(CrawlDbUpdater.CrawlDbUpdateReducer.class);
-      //job.setJarByClass(CrawlDbUpdater.class);
+      job.setJarByClass(CrawlDbUpdater.class);
       job.setOutputFormatClass(MapFileOutputFormat.class);
       job.setOutputKeyClass(Text.class);
       job.setOutputValueClass(CrawlDatum.class);
diff --git a/src/java/org/apache/nutch/crawl/LinkDbReader.java 
b/src/java/org/apache/nutch/crawl/LinkDbReader.java
index 519fa5996..cc0b6f614 100644
--- a/src/java/org/apache/nutch/crawl/LinkDbReader.java
+++ b/src/java/org/apache/nutch/crawl/LinkDbReader.java
@@ -27,13 +27,9 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.MapFile;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat;
@@ -44,7 +40,6 @@
 import org.apache.hadoop.mapreduce.Partitioner;
 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.hadoop.conf.Configuration;
 
@@ -57,14 +52,15 @@
 import java.util.Iterator;
 import java.io.Closeable;
 
-/** . */
+/**
+ * Read utility for the LinkDb.
+ */
 public class LinkDbReader extends AbstractChecker implements Closeable {
   private static final Logger LOG = LoggerFactory
       .getLogger(MethodHandles.lookup().lookupClass());
 
-  private static final Partitioner<WritableComparable, Writable> PARTITIONER = 
new HashPartitioner<>();
+  private static final Partitioner<Text, Inlinks> PARTITIONER = new 
HashPartitioner<>();
 
-  private FileSystem fs;
   private Path directory;
   private MapFile.Reader[] readers;
 
@@ -78,7 +74,6 @@ public LinkDbReader(Configuration conf, Path directory) 
throws Exception {
   }
 
   public void init(Path directory) throws Exception {
-    this.fs = directory.getFileSystem(getConf());
     this.directory = directory;
   }
 
@@ -149,8 +144,9 @@ public void processDumpJob(String linkdb, String output, 
String regex)
 
     Job job = NutchJob.getInstance(getConf());
     job.setJobName("read " + linkdb);
+    job.setJarByClass(LinkDbReader.class);
     
-    Configuration conf = job.getConfiguration();   
+    Configuration conf = job.getConfiguration();
  
     if (regex != null) {
       conf.set("linkdb.regex", regex);
diff --git a/src/java/org/apache/nutch/hostdb/ReadHostDb.java 
b/src/java/org/apache/nutch/hostdb/ReadHostDb.java
index 81145d312..360b11932 100644
--- a/src/java/org/apache/nutch/hostdb/ReadHostDb.java
+++ b/src/java/org/apache/nutch/hostdb/ReadHostDb.java
@@ -37,7 +37,6 @@
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -186,7 +185,6 @@ private void readHostDb(Path hostDb, Path output, boolean 
dumpHomepages, boolean
     FileInputFormat.addInputPath(job, new Path(hostDb, "current"));
     FileOutputFormat.setOutputPath(job, output);
 
-    job.setJarByClass(ReadHostDb.class);
     job.setMapperClass(ReadHostDbMapper.class);
 
     job.setInputFormatClass(SequenceFileInputFormat.class);
diff --git a/src/java/org/apache/nutch/indexer/CleaningJob.java 
b/src/java/org/apache/nutch/indexer/CleaningJob.java
index 3ac8b9e39..fd5dff913 100644
--- a/src/java/org/apache/nutch/indexer/CleaningJob.java
+++ b/src/java/org/apache/nutch/indexer/CleaningJob.java
@@ -155,6 +155,7 @@ public void delete(String crawldb, boolean noCommit)
     job.setMapOutputValueClass(Text.class);
     job.setMapperClass(DBFilter.class);
     job.setReducerClass(DeleterReducer.class);
+    job.setJarByClass(CleaningJob.class);
 
     job.setJobName("CleaningJob");
 
diff --git a/src/java/org/apache/nutch/scoring/webgraph/LinkDumper.java 
b/src/java/org/apache/nutch/scoring/webgraph/LinkDumper.java
index e134441dd..bde24cc33 100644
--- a/src/java/org/apache/nutch/scoring/webgraph/LinkDumper.java
+++ b/src/java/org/apache/nutch/scoring/webgraph/LinkDumper.java
@@ -88,7 +88,6 @@ public static void main(String[] args) throws Exception {
       // open the readers for the linkdump directory
       Configuration conf = NutchConfiguration.create();
       Path webGraphDb = new Path(args[0]);
-      FileSystem fs = webGraphDb.getFileSystem(conf);
       String url = args[1];
       MapFile.Reader[] readers = MapFileOutputFormat.getReaders(new Path(
           webGraphDb, DUMP_DIR), conf);
@@ -351,10 +350,17 @@ public void dumpLinks(Path webGraphDb) throws IOException,
 
     try {
       LOG.info("LinkDumper: running inverter");
-      int complete = inverter.waitForCompletion(true)?0:1;
+      boolean success = inverter.waitForCompletion(true);
+      if (!success) {
+        String message = "LinkDumper inverter job did not succeed, job status:"
+            + inverter.getStatus().getState() + ", reason: "
+            + inverter.getStatus().getFailureInfo();
+        LOG.error(message);
+        throw new RuntimeException(message);
+      }
       LOG.info("LinkDumper: finished inverter");
     } catch (IOException | InterruptedException | ClassNotFoundException e) {
-      LOG.error(StringUtils.stringifyException(e));
+      LOG.error("LinkDumper inverter job failed:", e);
       throw e;
     }
 
@@ -362,6 +368,7 @@ public void dumpLinks(Path webGraphDb) throws IOException,
     Job merger = NutchJob.getInstance(conf);
     merger.setJobName("LinkDumper: merger");
     FileInputFormat.addInputPath(merger, tempInverted);
+    merger.setJarByClass(Merger.class);
     merger.setInputFormatClass(SequenceFileInputFormat.class);
     merger.setReducerClass(Merger.class);
     merger.setMapOutputKeyClass(Text.class);
@@ -373,10 +380,17 @@ public void dumpLinks(Path webGraphDb) throws IOException,
 
     try {
       LOG.info("LinkDumper: running merger");
-      int complete = merger.waitForCompletion(true)?0:1;
+      boolean success = merger.waitForCompletion(true);
+      if (!success) {
+        String message = "LinkDumper merger job did not succeed, job status:"
+            + merger.getStatus().getState() + ", reason: "
+            + merger.getStatus().getFailureInfo();
+        LOG.error(message);
+        throw new RuntimeException(message);
+      }
       LOG.info("LinkDumper: finished merger");
     } catch (IOException e) {
-      LOG.error(StringUtils.stringifyException(e));
+      LOG.error("LinkDumper merger job failed:", e);
       throw e;
     }
 
diff --git a/src/java/org/apache/nutch/scoring/webgraph/LinkRank.java 
b/src/java/org/apache/nutch/scoring/webgraph/LinkRank.java
index 6b964eb0e..682992756 100644
--- a/src/java/org/apache/nutch/scoring/webgraph/LinkRank.java
+++ b/src/java/org/apache/nutch/scoring/webgraph/LinkRank.java
@@ -111,9 +111,16 @@ private int runCounter(FileSystem fs, Path webGraphDb) 
throws IOException,
     // run the counter job, outputs to a single reduce task and file
     LOG.info("Starting link counter job");
     try {
-      int complete = counter.waitForCompletion(true)?0:1;
+      boolean success = counter.waitForCompletion(true);
+      if (!success) {
+        String message = "Link counter job did not succeed, job status:"
+            + counter.getStatus().getState() + ", reason: "
+            + counter.getStatus().getFailureInfo();
+        LOG.error(message);
+        throw new RuntimeException(message);
+      }
     } catch (IOException | InterruptedException | ClassNotFoundException e) {
-      LOG.error(StringUtils.stringifyException(e));
+      LOG.error("Link counter job failed:", e);
       throw e;
     }
     
@@ -122,7 +129,7 @@ private int runCounter(FileSystem fs, Path webGraphDb) 
throws IOException,
     // read the first (and only) line from the file which should be the
     // number of links in the web graph
     LOG.info("Reading numlinks temp file");
-    FSDataInputStream readLinks = fs.open(new Path(numLinksPath, 
"part-00000"));
+    FSDataInputStream readLinks = fs.open(new Path(numLinksPath, 
"part-r-00000"));
     BufferedReader buffer = new BufferedReader(new 
InputStreamReader(readLinks));
     String numLinksLine = buffer.readLine();
     readLinks.close();
@@ -161,6 +168,7 @@ private void runInitializer(Path nodeDb, Path output) 
throws IOException,
     initializer.setJobName("LinkAnalysis Initializer");
     FileInputFormat.addInputPath(initializer, nodeDb);
     FileOutputFormat.setOutputPath(initializer, output);
+    initializer.setJarByClass(Initializer.class);
     initializer.setInputFormatClass(SequenceFileInputFormat.class);
     initializer.setMapperClass(Initializer.class);
     initializer.setMapOutputKeyClass(Text.class);
@@ -174,9 +182,16 @@ private void runInitializer(Path nodeDb, Path output) 
throws IOException,
     // run the initializer
     LOG.info("Starting initialization job");
     try {
-      int complete = initializer.waitForCompletion(true)?0:1;
+      boolean success = initializer.waitForCompletion(true);
+      if (!success) {
+        String message = "Initialization job did not succeed, job status:"
+            + initializer.getStatus().getState() + ", reason: "
+            + initializer.getStatus().getFailureInfo();
+        LOG.error(message);
+        throw new RuntimeException(message);
+      }
     } catch (IOException | InterruptedException | ClassNotFoundException e) {
-      LOG.error(StringUtils.stringifyException(e));
+      LOG.error("Initialization job failed:", e);
       throw e;
     }
     LOG.info("Finished initialization job.");
@@ -221,9 +236,16 @@ private void runInverter(Path nodeDb, Path outlinkDb, Path 
output)
     // run the inverter job
     LOG.info("Starting inverter job");
     try {
-      int complete = inverter.waitForCompletion(true)?0:1;
+      boolean success = inverter.waitForCompletion(true);
+      if (!success) {
+        String message = "Inverter job did not succeed, job status:"
+            + inverter.getStatus().getState() + ", reason: "
+            + inverter.getStatus().getFailureInfo();
+        LOG.error(message);
+        throw new RuntimeException(message);
+      }
     } catch (IOException | InterruptedException | ClassNotFoundException e) {
-      LOG.error(StringUtils.stringifyException(e));
+      LOG.error("Inverter job failed:", e);
       throw e;
     }
     LOG.info("Finished inverter job.");
@@ -278,9 +300,16 @@ private void runAnalysis(Path nodeDb, Path inverted, Path 
output,
 
     LOG.info("Starting analysis job");
     try {
-      int complete = analyzer.waitForCompletion(true)?0:1;
+      boolean success = analyzer.waitForCompletion(true);
+      if (!success) {
+        String message = "Analysis job did not succeed, job status:"
+            + analyzer.getStatus().getState() + ", reason: "
+            + analyzer.getStatus().getFailureInfo();
+        LOG.error(message);
+        throw new RuntimeException(message);
+      }
     } catch (IOException | InterruptedException | ClassNotFoundException e) {
-      LOG.error(StringUtils.stringifyException(e));
+      LOG.error("Analysis job failed:", e);
       throw e;
     }
     LOG.info("Finished analysis job.");
@@ -331,8 +360,6 @@ public void reduce(Text key, Iterable<LongWritable> values,
       }
     }
 
-    public void close() {
-    }
   }
 
   private static class Initializer extends Mapper<Text, Node, Text, Node> {
@@ -372,11 +399,8 @@ public void map(Text key, Node node, Context context)
     public static class InvertMapper extends 
         Mapper<Text, Writable, Text, ObjectWritable> {
 
-      private Configuration conf;
-
       @Override
       public void setup(Mapper<Text, Writable, Text, ObjectWritable>.Context 
context) {
-        conf = context.getConfiguration();
       }
 
       @Override
@@ -502,19 +526,14 @@ public void map(Text key, Writable value,
        * needed values for analysis.
        */
       @Override
-      public void setup(Reducer<Text, ObjectWritable, Text, Node>.Context 
context) {
-        try {
-          conf = context.getConfiguration();
-          dampingFactor = conf
-              .getFloat("link.analyze.damping.factor", 0.85f);
-          rankOne = conf.getFloat("link.analyze.rank.one", 0.0f);
-          itNum = conf.getInt("link.analyze.iteration", 0);
-          limitPages = conf.getBoolean("link.ignore.limit.page", true);
-          limitDomains = conf.getBoolean("link.ignore.limit.domain", true);
-        } catch (Exception e) {
-          LOG.error(StringUtils.stringifyException(e));
-          throw new IllegalArgumentException(e);
-        }
+      public void setup(
+          Reducer<Text, ObjectWritable, Text, Node>.Context context) {
+        conf = context.getConfiguration();
+        dampingFactor = conf.getFloat("link.analyze.damping.factor", 0.85f);
+        rankOne = conf.getFloat("link.analyze.rank.one", 0.0f);
+        itNum = conf.getInt("link.analyze.iteration", 0);
+        limitPages = conf.getBoolean("link.ignore.limit.page", true);
+        limitDomains = conf.getBoolean("link.ignore.limit.domain", true);
       }
 
       @Override
diff --git a/src/java/org/apache/nutch/scoring/webgraph/NodeDumper.java 
b/src/java/org/apache/nutch/scoring/webgraph/NodeDumper.java
index 84216478b..61b2061d4 100644
--- a/src/java/org/apache/nutch/scoring/webgraph/NodeDumper.java
+++ b/src/java/org/apache/nutch/scoring/webgraph/NodeDumper.java
@@ -335,9 +335,16 @@ public void dumpNodes(Path webGraphDb, DumpType type, long 
topN, Path output,
 
     try {
       LOG.info("NodeDumper: running");
-      int complete = dumper.waitForCompletion(true)?0:1;
+      boolean success = dumper.waitForCompletion(true);
+      if (!success) {
+        String message = "NodeDumper job did not succeed, job status:"
+            + dumper.getStatus().getState() + ", reason: "
+            + dumper.getStatus().getFailureInfo();
+        LOG.error(message);
+        throw new RuntimeException(message);
+      }
     } catch (IOException e) {
-      LOG.error(StringUtils.stringifyException(e));
+      LOG.error("NodeDumper job failed:", e);
       throw e;
     }
     long end = System.currentTimeMillis();
diff --git a/src/java/org/apache/nutch/scoring/webgraph/ScoreUpdater.java 
b/src/java/org/apache/nutch/scoring/webgraph/ScoreUpdater.java
index a5bb3f348..93a7c95dc 100644
--- a/src/java/org/apache/nutch/scoring/webgraph/ScoreUpdater.java
+++ b/src/java/org/apache/nutch/scoring/webgraph/ScoreUpdater.java
@@ -184,9 +184,21 @@ public void update(Path crawlDb, Path webGraphDb) throws 
IOException,
     updater.setOutputFormatClass(MapFileOutputFormat.class);
 
     try {
-      int complete = updater.waitForCompletion(true)?0:1;
+      boolean success = updater.waitForCompletion(true);
+      if (!success) {
+        String message = "Update CrawlDb from WebGraph job did not succeed, 
job status:"
+            + updater.getStatus().getState() + ", reason: "
+            + updater.getStatus().getFailureInfo();
+        LOG.error(message);
+        // remove the temp crawldb on error
+        FileSystem fs = newCrawlDb.getFileSystem(conf);
+        if (fs.exists(newCrawlDb)) {
+          fs.delete(newCrawlDb, true);
+        }
+        throw new RuntimeException(message);
+      }
     } catch (IOException | ClassNotFoundException | InterruptedException e) {
-      LOG.error(StringUtils.stringifyException(e));
+      LOG.error("Update CrawlDb from WebGraph:", e);
 
       // remove the temp crawldb on error
       FileSystem fs = newCrawlDb.getFileSystem(conf);
diff --git a/src/java/org/apache/nutch/scoring/webgraph/WebGraph.java 
b/src/java/org/apache/nutch/scoring/webgraph/WebGraph.java
index cfef442c8..06728d1be 100644
--- a/src/java/org/apache/nutch/scoring/webgraph/WebGraph.java
+++ b/src/java/org/apache/nutch/scoring/webgraph/WebGraph.java
@@ -153,14 +153,6 @@ public OutlinkDb(Configuration conf) {
     public static class OutlinkDbMapper extends
         Mapper<Text, Writable, Text, NutchWritable> {
 
-      // ignoring internal domains, internal hosts
-      private boolean ignoreDomain = true;
-      private boolean ignoreHost = true;
-
-      // limiting urls out to a page or to a domain
-      private boolean limitPages = true;
-      private boolean limitDomains = true;
-
       // using normalizers and/or filters
       private boolean normalize = false;
       private boolean filter = false;
@@ -230,10 +222,6 @@ private String filterUrl(String url) {
       public void setup(Mapper<Text, Writable, Text, NutchWritable>.Context 
context) {
         Configuration config = context.getConfiguration();
         conf = config;
-        ignoreHost = conf.getBoolean("link.ignore.internal.host", true);
-        ignoreDomain = conf.getBoolean("link.ignore.internal.domain", true);
-        limitPages = conf.getBoolean("link.ignore.limit.page", true);
-        limitDomains = conf.getBoolean("link.ignore.limit.domain", true);
 
         normalize = conf.getBoolean(URL_NORMALIZING, false);
         filter = conf.getBoolean(URL_FILTERING, false);
@@ -336,13 +324,7 @@ public void map(Text key, Writable value,
       private boolean limitPages = true;
       private boolean limitDomains = true;
 
-      // using normalizers and/or filters
-      private boolean normalize = false;
-      private boolean filter = false;
-
       // url normalizers, filters and job configuration
-      private URLNormalizers urlNormalizers;
-      private URLFilters filters;
       private Configuration conf;
 
       /**
@@ -356,16 +338,6 @@ public void setup(Reducer<Text, NutchWritable, Text, 
LinkDatum>.Context context)
         limitPages = conf.getBoolean("link.ignore.limit.page", true);
         limitDomains = conf.getBoolean("link.ignore.limit.domain", true);
         
-        normalize = conf.getBoolean(URL_NORMALIZING, false);
-        filter = conf.getBoolean(URL_FILTERING, false);
-
-        if (normalize) {
-          urlNormalizers = new URLNormalizers(conf, 
URLNormalizers.SCOPE_DEFAULT);
-        }
-
-        if (filter) {
-          filters = new URLFilters(conf);
-        }
       }
    
       public void reduce(Text key, Iterable<NutchWritable> values,
@@ -376,7 +348,8 @@ public void reduce(Text key, Iterable<NutchWritable> values,
         // which should be the timestamp for all of the most recent outlinks
         long mostRecent = 0L;
         List<LinkDatum> outlinkList = new ArrayList<>();
-        for (Writable value : values) {
+        for (NutchWritable val : values) {
+          final Writable value = val.get();
 
           if (value instanceof LinkDatum) {
             // loop through, change out most recent timestamp if needed
@@ -447,9 +420,6 @@ public void close() {
 
     private static long timestamp;
 
-    public void close() {
-    }
-
     /**
      * Inverts the Outlink LinkDatum objects into new LinkDatum objects with a
      * new system timestamp, type and to and from url switched.
@@ -488,9 +458,6 @@ public void map(Text key, LinkDatum datum,
    */
   private static class NodeDb extends Configured {
 
-    public void close() {
-    }
-
     /**
      * Counts the number of inlinks and outlinks for each url and sets a 
default
      * score of 0.0 for each url (node) in the webgraph.
@@ -634,7 +601,15 @@ public void createWebGraph(Path webGraphDb, Path[] 
segments,
     // run the outlinkdb job and replace any old outlinkdb with the new one
     try {
       LOG.info("OutlinkDb: running");
-      int complete = outlinkJob.waitForCompletion(true)?0:1;
+      boolean success = outlinkJob.waitForCompletion(true);
+      if (!success) {
+        String message = "OutlinkDb job did not succeed, job status:"
+            + outlinkJob.getStatus().getState() + ", reason: "
+            + outlinkJob.getStatus().getFailureInfo();
+        LOG.error(message);
+        NutchJob.cleanupAfterFailure(tempOutlinkDb, lock, fs);
+        throw new RuntimeException(message);
+      }
       LOG.info("OutlinkDb: installing " + outlinkDb);
       FSUtils.replace(fs, oldOutlinkDb, outlinkDb, true);
       FSUtils.replace(fs, outlinkDb, tempOutlinkDb, true);
@@ -642,13 +617,9 @@ public void createWebGraph(Path webGraphDb, Path[] 
segments,
         fs.delete(oldOutlinkDb, true);
       LOG.info("OutlinkDb: finished");
     } catch (IOException | InterruptedException | ClassNotFoundException e) {
-
+      LOG.error("OutlinkDb failed:", e);
       // remove lock file and and temporary directory if an error occurs
-      LockUtil.removeLockFile(fs, lock);
-      if (fs.exists(tempOutlinkDb)) {
-        fs.delete(tempOutlinkDb, true);
-      }
-      LOG.error(StringUtils.stringifyException(e));
+      NutchJob.cleanupAfterFailure(tempOutlinkDb, lock, fs);
       throw e;
     }
 
@@ -677,18 +648,22 @@ public void createWebGraph(Path webGraphDb, Path[] 
segments,
 
       // run the inlink and replace any old with new
       LOG.info("InlinkDb: running");
-      int complete = inlinkJob.waitForCompletion(true)?0:1;
+      boolean success = inlinkJob.waitForCompletion(true);
+      if (!success) {
+        String message = "InlinkDb job did not succeed, job status:"
+            + inlinkJob.getStatus().getState() + ", reason: "
+            + inlinkJob.getStatus().getFailureInfo();
+        LOG.error(message);
+        NutchJob.cleanupAfterFailure(tempInlinkDb, lock, fs);
+        throw new RuntimeException(message);
+      }
       LOG.info("InlinkDb: installing " + inlinkDb);
       FSUtils.replace(fs, inlinkDb, tempInlinkDb, true);
       LOG.info("InlinkDb: finished");
     } catch (IOException | InterruptedException | ClassNotFoundException e) {
-
+      LOG.error("InlinkDb failed:", e);
       // remove lock file and and temporary directory if an error occurs
-      LockUtil.removeLockFile(fs, lock);
-      if (fs.exists(tempInlinkDb)) {
-        fs.delete(tempInlinkDb, true);
-      }
-      LOG.error(StringUtils.stringifyException(e));
+      NutchJob.cleanupAfterFailure(tempInlinkDb, lock, fs);
       throw e;
     }
 
@@ -719,18 +694,23 @@ public void createWebGraph(Path webGraphDb, Path[] 
segments,
 
       // run the node job and replace old nodedb with new
       LOG.info("NodeDb: running");
-      int complete = nodeJob.waitForCompletion(true)?0:1;
+      boolean success = nodeJob.waitForCompletion(true);
+      if (!success) {
+        String message = "NodeDb job did not succeed, job status:"
+            + nodeJob.getStatus().getState() + ", reason: "
+            + nodeJob.getStatus().getFailureInfo();
+        LOG.error(message);
+        // remove lock file and and temporary directory if an error occurs
+        NutchJob.cleanupAfterFailure(tempNodeDb, lock, fs);
+        throw new RuntimeException(message);
+      }
       LOG.info("NodeDb: installing " + nodeDb);
       FSUtils.replace(fs, nodeDb, tempNodeDb, true);
       LOG.info("NodeDb: finished");
     } catch (IOException | InterruptedException | ClassNotFoundException e) {
-
+      LOG.error("NodeDb failed:", e);
       // remove lock file and and temporary directory if an error occurs
-      LockUtil.removeLockFile(fs, lock);
-      if (fs.exists(tempNodeDb)) {
-        fs.delete(tempNodeDb, true);
-      }
-      LOG.error(StringUtils.stringifyException(e));
+      NutchJob.cleanupAfterFailure(tempNodeDb, lock, fs);
       throw e;
     }
 
diff --git a/src/java/org/apache/nutch/segment/SegmentReader.java 
b/src/java/org/apache/nutch/segment/SegmentReader.java
index 0b65a2b81..325635f9c 100644
--- a/src/java/org/apache/nutch/segment/SegmentReader.java
+++ b/src/java/org/apache/nutch/segment/SegmentReader.java
@@ -52,7 +52,6 @@
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat;
 import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Mapper.Context;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -60,13 +59,13 @@
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.LinkDbReader;
 import org.apache.nutch.crawl.NutchWritable;
 import org.apache.nutch.parse.ParseData;
 import org.apache.nutch.parse.ParseText;
 import org.apache.nutch.protocol.Content;
 import org.apache.nutch.util.HadoopFSUtil;
 import org.apache.nutch.util.NutchConfiguration;
-import org.apache.nutch.util.NutchJob;
 import org.apache.nutch.util.SegmentReaderUtil;
 
 /** Dump the content of a segment. */
@@ -158,20 +157,9 @@ public void setup(Job job) {
       this.pt = conf.getBoolean("segment.reader.pt", true);
     }
 
-  private Configuration createJobConf() throws IOException {
-    Job job = NutchJob.getInstance(getConf());
-    Configuration conf = job.getConfiguration();
-    conf.setBoolean("segment.reader.co", this.co);
-    conf.setBoolean("segment.reader.fe", this.fe);
-    conf.setBoolean("segment.reader.ge", this.ge);
-    conf.setBoolean("segment.reader.pa", this.pa);
-    conf.setBoolean("segment.reader.pd", this.pd);
-    conf.setBoolean("segment.reader.pt", this.pt);
-    return conf;
-  }
-
   public void close() {
   }
+
   public static class InputCompatReducer extends
       Reducer<Text, NutchWritable, Text, Text> {
 
@@ -232,6 +220,7 @@ public void dump(Path segment, Path output) throws 
IOException,
     job.setInputFormatClass(SequenceFileInputFormat.class);
     job.setMapperClass(InputCompatMapper.class);
     job.setReducerClass(InputCompatReducer.class);
+    job.setJarByClass(SegmentReader.class);
 
     Path tempDir = new Path(conf.get("hadoop.tmp.dir", "/tmp") + "/segread-"
         + new java.util.Random().nextInt());
@@ -459,7 +448,7 @@ public void run() {
     Class<?> valueClass = readers[0].getValueClass();
     if (!keyClass.getName().equals("org.apache.hadoop.io.Text"))
       throw new IOException("Incompatible key (" + keyClass.getName() + ")");
-    WritableComparable aKey = (WritableComparable) keyClass.newInstance();
+    WritableComparable<?> aKey = (WritableComparable<?>) 
keyClass.newInstance();
     Writable value = (Writable) valueClass.newInstance();
     for (int i = 0; i < readers.length; i++) {
       while (readers[i].next(aKey, value)) {
diff --git a/src/test/org/apache/nutch/crawl/TestCrawlDbFilter.java 
b/src/test/org/apache/nutch/crawl/TestCrawlDbFilter.java
index 1ef3e3b44..db083804d 100644
--- a/src/test/org/apache/nutch/crawl/TestCrawlDbFilter.java
+++ b/src/test/org/apache/nutch/crawl/TestCrawlDbFilter.java
@@ -108,6 +108,7 @@ public void testUrl404Purging() throws Exception {
     job.setOutputFormatClass(MapFileOutputFormat.class);
     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(CrawlDatum.class);
+    job.setJarByClass(CrawlDbFilter.class);
     job.waitForCompletion(true);
 
     Path fetchlist = new Path(new Path(newCrawlDb, "part-r-00000"), "data");


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> ClassNotFoundException when running in (pseudo-)distributed mode
> ----------------------------------------------------------------
>
>                 Key: NUTCH-2569
>                 URL: https://issues.apache.org/jira/browse/NUTCH-2569
>             Project: Nutch
>          Issue Type: Bug
>    Affects Versions: 1.15
>            Reporter: Sebastian Nagel
>            Assignee: Sebastian Nagel
>            Priority: Blocker
>             Fix For: 1.15
>
>
> The CrawlDb / updatedb job fails in pseudo-distributed mode with a 
> ClassNotFoundException:
> {noformat}
> 18/04/22 19:24:49 INFO mapreduce.Job: Task Id : 
> attempt_1524395182329_0018_m_000000_0, Status : FAILED
> Error: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class 
> org.apache.nutch.crawl.CrawlDbFilter not found
>         at 
> org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2369)
>         at 
> org.apache.hadoop.mapreduce.task.JobContextImpl.getMapperClass(JobContextImpl.java:186)
>         at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:745)
>         at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
>         at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:175)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at javax.security.auth.Subject.doAs(Subject.java:422)
>         at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>         at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:169)
> Caused by: java.lang.ClassNotFoundException: Class 
> org.apache.nutch.crawl.CrawlDbFilter not found
>         at 
> org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2273)
>         at 
> org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2367)
> {noformat}
> Must define the job jar by calling {{job.setJarByClass(...)}}. This affects 
> also other jobs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to