[ 
https://issues.apache.org/jira/browse/NUTCH-2517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16398765#comment-16398765
 ] 

ASF GitHub Bot commented on NUTCH-2517:
---------------------------------------

lewismc closed pull request #293: NUTCH-2517 mergesegs corrupts segment data
URL: https://github.com/apache/nutch/pull/293
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/src/java/org/apache/nutch/crawl/LinkDb.java 
b/src/java/org/apache/nutch/crawl/LinkDb.java
index b7a89b229..c6a32ba86 100644
--- a/src/java/org/apache/nutch/crawl/LinkDb.java
+++ b/src/java/org/apache/nutch/crawl/LinkDb.java
@@ -17,31 +17,39 @@
 
 package org.apache.nutch.crawl;
 
-import java.io.*;
+import java.io.File;
+import java.io.IOException;
 import java.lang.invoke.MethodHandles;
+import java.net.MalformedURLException;
+import java.net.URL;
 import java.text.SimpleDateFormat;
-import java.util.*;
-import java.net.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
 
-// Commons Logging imports
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.fs.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.conf.*;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat;
-import org.apache.hadoop.mapreduce.Mapper.Context;
-import org.apache.hadoop.util.*;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
 import org.apache.nutch.metadata.Nutch;
 import org.apache.nutch.net.URLFilters;
 import org.apache.nutch.net.URLNormalizers;
-import org.apache.nutch.parse.*;
+import org.apache.nutch.parse.Outlink;
+import org.apache.nutch.parse.ParseData;
 import org.apache.nutch.util.HadoopFSUtil;
 import org.apache.nutch.util.LockUtil;
 import org.apache.nutch.util.NutchConfiguration;
@@ -53,7 +61,7 @@
 public class LinkDb extends NutchTool implements Tool {
 
   private static final Logger LOG = LoggerFactory
-      .getLogger(MethodHandles.lookup().lookupClass());
+          .getLogger(MethodHandles.lookup().lookupClass());
 
   public static final String IGNORE_INTERNAL_LINKS = 
"linkdb.ignore.internal.links";
   public static final String IGNORE_EXTERNAL_LINKS = 
"linkdb.ignore.external.links";
@@ -62,6 +70,7 @@
   public static final String LOCK_NAME = ".locked";
 
   public LinkDb() {
+    //default constructor
   }
 
   public LinkDb(Configuration conf) {
@@ -69,7 +78,7 @@ public LinkDb(Configuration conf) {
   }
 
   public static class LinkDbMapper extends 
-      Mapper<Text, ParseData, Text, Inlinks> {
+  Mapper<Text, ParseData, Text, Inlinks> {
     private int maxAnchorLength;
     private boolean ignoreInternalLinks;
     private boolean ignoreExternalLinks;
@@ -94,17 +103,16 @@ public void cleanup(){
     }
 
     public void map(Text key, ParseData parseData,
-        Context context)
-        throws IOException, InterruptedException {
+            Context context)
+                    throws IOException, InterruptedException {
       String fromUrl = key.toString();
       String fromHost = getHost(fromUrl);
       if (urlNormalizers != null) {
         try {
           fromUrl = urlNormalizers
-              .normalize(fromUrl, URLNormalizers.SCOPE_LINKDB); // normalize 
the
-                                                                // url
+                  .normalize(fromUrl, URLNormalizers.SCOPE_LINKDB); // 
normalize the url
         } catch (Exception e) {
-          LOG.warn("Skipping " + fromUrl + ":" + e);
+          LOG.warn("Skipping {} :", fromUrl, e);
           fromUrl = null;
         }
       }
@@ -112,7 +120,7 @@ public void map(Text key, ParseData parseData,
         try {
           fromUrl = urlFilters.filter(fromUrl); // filter the url
         } catch (Exception e) {
-          LOG.warn("Skipping " + fromUrl + ":" + e);
+          LOG.warn("Skipping {} :", fromUrl, e);
           fromUrl = null;
         }
       }
@@ -131,17 +139,16 @@ public void map(Text key, ParseData parseData,
           }
         } else if (ignoreExternalLinks) {
           String toHost = getHost(toUrl);
-          if (toHost == null || !toHost.equals(fromHost)) { // external link
-            continue;                               // skip it
+          if (toHost == null || !toHost.equals(fromHost)) { // external link 
skip it
+            continue;
           }
         }
         if (urlNormalizers != null) {
           try {
-            toUrl = urlNormalizers.normalize(toUrl, 
URLNormalizers.SCOPE_LINKDB); // normalize
-                                                                               
   // the
-                                                                               
   // url
+            // normalize the url
+            toUrl = urlNormalizers.normalize(toUrl, 
URLNormalizers.SCOPE_LINKDB); 
           } catch (Exception e) {
-            LOG.warn("Skipping " + toUrl + ":" + e);
+            LOG.warn("Skipping {} :", toUrl, e);
             toUrl = null;
           }
         }
@@ -149,7 +156,7 @@ public void map(Text key, ParseData parseData,
           try {
             toUrl = urlFilters.filter(toUrl); // filter the url
           } catch (Exception e) {
-            LOG.warn("Skipping " + toUrl + ":" + e);
+            LOG.warn("Skipping {} :", toUrl, e);
             toUrl = null;
           }
         }
@@ -175,15 +182,15 @@ private static String getHost(String url) {
   }
 
   public void invert(Path linkDb, final Path segmentsDir, boolean normalize,
-      boolean filter, boolean force) throws IOException, InterruptedException, 
ClassNotFoundException {
+          boolean filter, boolean force) throws IOException, 
InterruptedException, ClassNotFoundException {
     FileSystem fs = segmentsDir.getFileSystem(getConf());
     FileStatus[] files = fs.listStatus(segmentsDir,
-        HadoopFSUtil.getPassDirectoriesFilter(fs));
+            HadoopFSUtil.getPassDirectoriesFilter(fs));
     invert(linkDb, HadoopFSUtil.getPaths(files), normalize, filter, force);
   }
 
   public void invert(Path linkDb, Path[] segments, boolean normalize,
-      boolean filter, boolean force) throws IOException, InterruptedException, 
ClassNotFoundException {
+          boolean filter, boolean force) throws IOException, 
InterruptedException, ClassNotFoundException {
     Job job = LinkDb.createJob(getConf(), linkDb, normalize, filter);
     Path lock = new Path(linkDb, LOCK_NAME);
     FileSystem fs = linkDb.getFileSystem(getConf());
@@ -194,10 +201,10 @@ public void invert(Path linkDb, Path[] segments, boolean 
normalize,
     SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
     long start = System.currentTimeMillis();
     if (LOG.isInfoEnabled()) {
-      LOG.info("LinkDb: starting at " + sdf.format(start));
-      LOG.info("LinkDb: linkdb: " + linkDb);
-      LOG.info("LinkDb: URL normalize: " + normalize);
-      LOG.info("LinkDb: URL filter: " + filter);
+      LOG.info("LinkDb: starting at {}", sdf.format(start));
+      LOG.info("LinkDb: linkdb: {}", linkDb);
+      LOG.info("LinkDb: URL normalize: {}", normalize);
+      LOG.info("LinkDb: URL filter: {}", filter);
       if (conf.getBoolean(IGNORE_INTERNAL_LINKS, true)) {
         LOG.info("LinkDb: internal links will be ignored.");
       }
@@ -206,30 +213,30 @@ public void invert(Path linkDb, Path[] segments, boolean 
normalize,
       }
     }
     if (conf.getBoolean(IGNORE_INTERNAL_LINKS, true)
-        && conf.getBoolean(IGNORE_EXTERNAL_LINKS, false)) {
+            && conf.getBoolean(IGNORE_EXTERNAL_LINKS, false)) {
       LOG.warn("LinkDb: internal and external links are ignored! "
-          + "Nothing to do, actually. Exiting.");
+              + "Nothing to do, actually. Exiting.");
       LockUtil.removeLockFile(fs, lock);
       return;
     }
 
     for (int i = 0; i < segments.length; i++) {
       if (LOG.isInfoEnabled()) {
-        LOG.info("LinkDb: adding segment: " + segments[i]);
+        LOG.info("LinkDb: adding segment: {}", segments[i]);
       }
       FileInputFormat.addInputPath(job, new Path(segments[i],
-          ParseData.DIR_NAME));
+              ParseData.DIR_NAME));
     }
     try {
       int complete = job.waitForCompletion(true)?0:1;
-    } catch (IOException | InterruptedException | ClassNotFoundException e) {
+    } catch (IOException | InterruptedException | ClassNotFoundException  e) {
       LockUtil.removeLockFile(fs, lock);
       throw e;
     }
 
     if (fs.exists(currentLinkDb)) {
       if (LOG.isInfoEnabled()) {
-        LOG.info("LinkDb: merging with existing linkdb: " + linkDb);
+        LOG.info("LinkDb: merging with existing linkdb: {}", linkDb);
       }
       // try to merge
       Path newLinkDb = FileOutputFormat.getOutputPath(job);
@@ -248,14 +255,13 @@ public void invert(Path linkDb, Path[] segments, boolean 
normalize,
     LinkDb.install(job, linkDb);
 
     long end = System.currentTimeMillis();
-    LOG.info("LinkDb: finished at " + sdf.format(end) + ", elapsed: "
-        + TimingUtil.elapsedTime(start, end));
+    LOG.info("LinkDb: finished at {}, elapsed: {}", sdf.format(end), 
TimingUtil.elapsedTime(start, end));
   }
 
   private static Job createJob(Configuration config, Path linkDb,
-      boolean normalize, boolean filter) throws IOException {
+          boolean normalize, boolean filter) throws IOException {
     Path newLinkDb = new Path(linkDb,
-        Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+            Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
 
     Job job = NutchJob.getInstance(config);
     Configuration conf = job.getConfiguration();
@@ -265,7 +271,7 @@ private static Job createJob(Configuration config, Path 
linkDb,
 
     job.setJarByClass(LinkDb.class);
     job.setMapperClass(LinkDb.LinkDbMapper.class);
-    
+
     job.setJarByClass(LinkDbMerger.class);
     job.setCombinerClass(LinkDbMerger.LinkDbMergeReducer.class);
     // if we don't run the mergeJob, perform normalization/filtering now
@@ -277,7 +283,7 @@ private static Job createJob(Configuration config, Path 
linkDb,
           conf.setBoolean(LinkDbFilter.URL_NORMALIZING, normalize);
         }
       } catch (Exception e) {
-        LOG.warn("LinkDb createJob: " + e);
+        LOG.warn("LinkDb createJob: {}", e);
       }
     }
     job.setReducerClass(LinkDbMerger.LinkDbMergeReducer.class);
@@ -317,13 +323,13 @@ public static void main(String[] args) throws Exception {
   public int run(String[] args) throws Exception {
     if (args.length < 2) {
       System.err
-          .println("Usage: LinkDb <linkdb> (-dir <segmentsDir> | <seg1> <seg2> 
...) [-force] [-noNormalize] [-noFilter]");
+      .println("Usage: LinkDb <linkdb> (-dir <segmentsDir> | <seg1> <seg2> 
...) [-force] [-noNormalize] [-noFilter]");
       System.err.println("\tlinkdb\toutput LinkDb to create or update");
       System.err
-          .println("\t-dir segmentsDir\tparent directory of several segments, 
OR");
+      .println("\t-dir segmentsDir\tparent directory of several segments, OR");
       System.err.println("\tseg1 seg2 ...\t list of segment directories");
       System.err
-          .println("\t-force\tforce update even if LinkDb appears to be locked 
(CAUTION advised)");
+      .println("\t-force\tforce update even if LinkDb appears to be locked 
(CAUTION advised)");
       System.err.println("\t-noNormalize\tdon't normalize link URLs");
       System.err.println("\t-noFilter\tdon't apply URLFilters to link URLs");
       return -1;
@@ -334,17 +340,17 @@ public int run(String[] args) throws Exception {
     boolean normalize = true;
     boolean force = false;
     for (int i = 1; i < args.length; i++) {
-      if (args[i].equals("-dir")) {
+      if ("-dir".equals(args[i])) {
         Path segDir = new Path(args[++i]);
         FileSystem fs = segDir.getFileSystem(getConf());
         FileStatus[] paths = fs.listStatus(segDir,
-            HadoopFSUtil.getPassDirectoriesFilter(fs));
+                HadoopFSUtil.getPassDirectoriesFilter(fs));
         segs.addAll(Arrays.asList(HadoopFSUtil.getPaths(paths)));
-      } else if (args[i].equalsIgnoreCase("-noNormalize")) {
+      } else if ("-noNormalize".equalsIgnoreCase(args[i])) {
         normalize = false;
-      } else if (args[i].equalsIgnoreCase("-noFilter")) {
+      } else if ("-noFilter".equalsIgnoreCase(args[i])) {
         filter = false;
-      } else if (args[i].equalsIgnoreCase("-force")) {
+      } else if ("-force".equalsIgnoreCase(args[i])) {
         force = true;
       } else
         segs.add(new Path(args[i]));
@@ -353,7 +359,7 @@ public int run(String[] args) throws Exception {
       invert(db, segs.toArray(new Path[segs.size()]), normalize, filter, 
force);
       return 0;
     } catch (Exception e) {
-      LOG.error("LinkDb: " + StringUtils.stringifyException(e));
+      LOG.error("LinkDb: {}", StringUtils.stringifyException(e));
       return -1;
     }
   }
@@ -406,25 +412,25 @@ public int run(String[] args) throws Exception {
       }
       FileSystem fs = segmentsDir.getFileSystem(getConf());
       FileStatus[] paths = fs.listStatus(segmentsDir,
-          HadoopFSUtil.getPassDirectoriesFilter(fs));
+              HadoopFSUtil.getPassDirectoriesFilter(fs));
       segs.addAll(Arrays.asList(HadoopFSUtil.getPaths(paths)));
     }
     else if(args.containsKey(Nutch.ARG_SEGMENTS)) {
       Object segments = args.get(Nutch.ARG_SEGMENTS);
-      ArrayList<String> segmentList = new ArrayList<String>(); 
+      ArrayList<String> segmentList = new ArrayList<>(); 
       if(segments instanceof ArrayList) {
-       segmentList = (ArrayList<String>)segments; }
+        segmentList = (ArrayList<String>)segments; }
       else if(segments instanceof Path){
         segmentList.add(segments.toString());
       }
-             
+
       for(String segment: segmentList) {
-       segs.add(new Path(segment));
+        segs.add(new Path(segment));
       }
-     }
+    }
     else {
-      String segment_dir = crawlId+"/segments";
-      File dir = new File(segment_dir);
+      String segmentDir = crawlId+"/segments";
+      File dir = new File(segmentDir);
       File[] segmentsList = dir.listFiles();  
       Arrays.sort(segmentsList, (f1, f2) -> {
         if(f1.lastModified()>f2.lastModified())
@@ -439,7 +445,7 @@ else if(segments instanceof Path){
       results.put(Nutch.VAL_RESULT, Integer.toString(0));
       return results;
     } catch (Exception e) {
-      LOG.error("LinkDb: " + StringUtils.stringifyException(e));
+      LOG.error("LinkDb: {}", StringUtils.stringifyException(e));
       results.put(Nutch.VAL_RESULT, Integer.toString(-1));
       return results;
     }
diff --git a/src/java/org/apache/nutch/segment/SegmentMerger.java 
b/src/java/org/apache/nutch/segment/SegmentMerger.java
index dcff8e2ce..780e10a6f 100644
--- a/src/java/org/apache/nutch/segment/SegmentMerger.java
+++ b/src/java/org/apache/nutch/segment/SegmentMerger.java
@@ -47,7 +47,6 @@
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Mapper.Context;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.Reducer;
@@ -55,7 +54,6 @@
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.nutch.crawl.CrawlDatum;
@@ -123,7 +121,7 @@
  */
 public class SegmentMerger extends Configured implements Tool{
   private static final Logger LOG = LoggerFactory
-      .getLogger(MethodHandles.lookup().lookupClass());
+          .getLogger(MethodHandles.lookup().lookupClass());
 
   private static final String SEGMENT_PART_KEY = "part";
   private static final String SEGMENT_SLICE_KEY = "slice";
@@ -133,16 +131,16 @@
    * in reduce and use additional metadata.
    */
   public static class ObjectInputFormat extends
-      SequenceFileInputFormat<Text, MetaWrapper> {
+  SequenceFileInputFormat<Text, MetaWrapper> {
 
     @Override
     public RecordReader<Text, MetaWrapper> createRecordReader(
-        final InputSplit split, TaskAttemptContext context)
-        throws IOException {
+            final InputSplit split, TaskAttemptContext context)
+                    throws IOException {
 
       context.setStatus(split.toString());
       Configuration conf = context.getConfiguration();
-      
+
       // find part name
       SegmentPart segmentPart;
       final String spString;
@@ -164,17 +162,15 @@
 
         @Override
         public void initialize(InputSplit split, TaskAttemptContext context) 
-            throws IOException, InterruptedException {
+                throws IOException, InterruptedException {
           splitReader.initialize(split, context);
         }
 
 
         @Override
         public synchronized boolean nextKeyValue()
-            throws IOException, InterruptedException {
+                throws IOException, InterruptedException {
           try {
-            LOG.debug("Running OIF.nextKeyValue()");
-
             boolean res = splitReader.nextKeyValue();
             if(res == false) {
               return res;
@@ -210,23 +206,24 @@ public synchronized void close() throws IOException {
   }
 
   public static class SegmentOutputFormat extends
-      FileOutputFormat<Text, MetaWrapper> {
+  FileOutputFormat<Text, MetaWrapper> {
     private static final String DEFAULT_SLICE = "default";
 
+    @Override
     public RecordWriter<Text, MetaWrapper> getRecordWriter(TaskAttemptContext 
context)
-        throws IOException {
+            throws IOException {
       Configuration conf = context.getConfiguration();
       String name = context.getTaskAttemptID().toString();
       Path dir = FileOutputFormat.getOutputPath(context);
       FileSystem fs = dir.getFileSystem(context.getConfiguration());
 
       return new RecordWriter<Text, MetaWrapper>() {
-        MapFile.Writer c_out = null;
-        MapFile.Writer f_out = null;
-        MapFile.Writer pd_out = null;
-        MapFile.Writer pt_out = null;
-        SequenceFile.Writer g_out = null;
-        SequenceFile.Writer p_out = null;
+        MapFile.Writer cOut = null;
+        MapFile.Writer fOut = null;
+        MapFile.Writer pdOut = null;
+        MapFile.Writer ptOut = null;
+        SequenceFile.Writer gOut = null;
+        SequenceFile.Writer pOut = null;
         HashMap<String, Closeable> sliceWriters = new HashMap<>();
         String segmentName = conf.get("segment.merger.segmentName");
 
@@ -237,116 +234,108 @@ public void write(Text key, MetaWrapper wrapper) throws 
IOException {
           String slice = wrapper.getMeta(SEGMENT_SLICE_KEY);
           if (o instanceof CrawlDatum) {
             if (sp.partName.equals(CrawlDatum.GENERATE_DIR_NAME)) {
-              g_out = ensureSequenceFile(slice, CrawlDatum.GENERATE_DIR_NAME);
-              g_out.append(key, o);
+              gOut = ensureSequenceFile(slice, CrawlDatum.GENERATE_DIR_NAME);
+              gOut.append(key, o);
             } else if (sp.partName.equals(CrawlDatum.FETCH_DIR_NAME)) {
-              f_out = ensureMapFile(slice, CrawlDatum.FETCH_DIR_NAME,
-                  CrawlDatum.class);
-              f_out.append(key, o);
+              fOut = ensureMapFile(slice, CrawlDatum.FETCH_DIR_NAME,
+                      CrawlDatum.class);
+              fOut.append(key, o);
             } else if (sp.partName.equals(CrawlDatum.PARSE_DIR_NAME)) {
-              p_out = ensureSequenceFile(slice, CrawlDatum.PARSE_DIR_NAME);
-              p_out.append(key, o);
+              pOut = ensureSequenceFile(slice, CrawlDatum.PARSE_DIR_NAME);
+              pOut.append(key, o);
             } else {
               throw new IOException("Cannot determine segment part: "
-                  + sp.partName);
+                      + sp.partName);
             }
           } else if (o instanceof Content) {
-            c_out = ensureMapFile(slice, Content.DIR_NAME, Content.class);
-            c_out.append(key, o);
+            cOut = ensureMapFile(slice, Content.DIR_NAME, Content.class);
+            cOut.append(key, o);
           } else if (o instanceof ParseData) {
             // update the segment name inside contentMeta - required by Indexer
             if (slice == null) {
               ((ParseData) o).getContentMeta().set(Nutch.SEGMENT_NAME_KEY,
-                  segmentName);
+                      segmentName);
             } else {
               ((ParseData) o).getContentMeta().set(Nutch.SEGMENT_NAME_KEY,
-                  segmentName + "-" + slice);
+                      segmentName + "-" + slice);
             }
-            pd_out = ensureMapFile(slice, ParseData.DIR_NAME, ParseData.class);
-            pd_out.append(key, o);
+            pdOut = ensureMapFile(slice, ParseData.DIR_NAME, ParseData.class);
+            pdOut.append(key, o);
           } else if (o instanceof ParseText) {
-            pt_out = ensureMapFile(slice, ParseText.DIR_NAME, ParseText.class);
-            pt_out.append(key, o);
+            ptOut = ensureMapFile(slice, ParseText.DIR_NAME, ParseText.class);
+            ptOut.append(key, o);
           }
         }
 
         // lazily create SequenceFile-s.
         private SequenceFile.Writer ensureSequenceFile(String slice,
-            String dirName) throws IOException {
+                String dirName) throws IOException {
           if (slice == null)
             slice = DEFAULT_SLICE;
           SequenceFile.Writer res = (SequenceFile.Writer) sliceWriters
-              .get(slice + dirName);
+                  .get(slice + dirName);
           if (res != null)
             return res;
           Path wname;
           Path out = FileOutputFormat.getOutputPath(context);
           if (slice == DEFAULT_SLICE) {
             wname = new Path(new Path(new Path(out, segmentName), dirName),
-                name);
+                    name);
           } else {
             wname = new Path(new Path(new Path(out, segmentName + "-" + slice),
-                dirName), name);
+                    dirName), name);
           }
-          
-//          Option rKeyClassOpt = MapFile.Writer.keyClass(Text.class);
-//          org.apache.hadoop.io.SequenceFile.Writer.Option rValClassOpt = 
SequenceFile.Writer.valueClass(CrawlDatum.class);
-//          Option rProgressOpt = (Option) 
SequenceFile.Writer.progressable(progress);
-//          Option rCompOpt = (Option) 
SequenceFile.Writer.compression(SequenceFileOutputFormat.getOutputCompressionType(job));
-//          Option rFileOpt = (Option) SequenceFile.Writer.file(wname);
-          
-          //res = SequenceFile.createWriter(job, rFileOpt, rKeyClassOpt,
-           //   rValClassOpt, rCompOpt, rProgressOpt);
-          
+
           res = SequenceFile.createWriter(conf, 
SequenceFile.Writer.file(wname),
-              SequenceFile.Writer.keyClass(Text.class),
-              SequenceFile.Writer.valueClass(CrawlDatum.class),
-              
SequenceFile.Writer.bufferSize(fs.getConf().getInt("io.file.buffer.size",4096)),
-              SequenceFile.Writer.replication(fs.getDefaultReplication(wname)),
-              SequenceFile.Writer.blockSize(1073741824),
-              
SequenceFile.Writer.compression(SequenceFileOutputFormat.getOutputCompressionType(context),
 new DefaultCodec()),
-              SequenceFile.Writer.progressable((Progressable)context),
-              SequenceFile.Writer.metadata(new Metadata())); 
-          
+                  SequenceFile.Writer.keyClass(Text.class),
+                  SequenceFile.Writer.valueClass(CrawlDatum.class),
+                  
SequenceFile.Writer.bufferSize(fs.getConf().getInt("io.file.buffer.size",4096)),
+                  
SequenceFile.Writer.replication(fs.getDefaultReplication(wname)),
+                  SequenceFile.Writer.blockSize(1073741824),
+                  
SequenceFile.Writer.compression(SequenceFileOutputFormat.getOutputCompressionType(context),
 new DefaultCodec()),
+                  SequenceFile.Writer.progressable((Progressable)context),
+                  SequenceFile.Writer.metadata(new Metadata())); 
+
           sliceWriters.put(slice + dirName, res);
           return res;
         }
 
         // lazily create MapFile-s.
         private MapFile.Writer ensureMapFile(String slice, String dirName,
-            Class<? extends Writable> clazz) throws IOException {
+                Class<? extends Writable> clazz) throws IOException {
           if (slice == null)
             slice = DEFAULT_SLICE;
           MapFile.Writer res = (MapFile.Writer) sliceWriters.get(slice
-              + dirName);
+                  + dirName);
           if (res != null)
             return res;
           Path wname;
           Path out = FileOutputFormat.getOutputPath(context);
           if (slice == DEFAULT_SLICE) {
             wname = new Path(new Path(new Path(out, segmentName), dirName),
-                name);
+                    name);
           } else {
             wname = new Path(new Path(new Path(out, segmentName + "-" + slice),
-                dirName), name);
+                    dirName), name);
           }
           CompressionType compType = SequenceFileOutputFormat
-              .getOutputCompressionType(context);
+                  .getOutputCompressionType(context);
           if (clazz.isAssignableFrom(ParseText.class)) {
             compType = CompressionType.RECORD;
           }
-          
-          Option rKeyClassOpt = (Option) MapFile.Writer.keyClass(Text.class);
+
+          Option rKeyClassOpt = MapFile.Writer.keyClass(Text.class);
           org.apache.hadoop.io.SequenceFile.Writer.Option rValClassOpt = 
SequenceFile.Writer.valueClass(clazz);
           org.apache.hadoop.io.SequenceFile.Writer.Option rProgressOpt = 
SequenceFile.Writer.progressable((Progressable)context);
           org.apache.hadoop.io.SequenceFile.Writer.Option rCompOpt = 
SequenceFile.Writer.compression(compType);
-          
+
           res = new MapFile.Writer(conf, wname, rKeyClassOpt,
-              rValClassOpt, rCompOpt, rProgressOpt);
+                  rValClassOpt, rCompOpt, rProgressOpt);
           sliceWriters.put(slice + dirName, res);
           return res;
         }
 
+        @Override
         public void close(TaskAttemptContext context) throws IOException {
           Iterator<Closeable> it = sliceWriters.values().iterator();
           while (it.hasNext()) {
@@ -370,6 +359,7 @@ public SegmentMerger(Configuration conf) {
     super(conf);
   }
 
+  @Override
   public void setConf(Configuration conf) {
     super.setConf(conf);
   }
@@ -379,11 +369,11 @@ public void close() throws IOException {
 
 
   public static class SegmentMergerMapper extends
-      Mapper<Text, MetaWrapper, Text, MetaWrapper> {
-   
+  Mapper<Text, MetaWrapper, Text, MetaWrapper> {
+
     private URLFilters filters = null;
     private URLNormalizers normalizers = null;
- 
+
     @Override
     public void setup(Mapper<Text, MetaWrapper, Text, MetaWrapper>.Context 
context) {
       Configuration conf = context.getConfiguration();
@@ -396,14 +386,14 @@ public void setup(Mapper<Text, MetaWrapper, Text, 
MetaWrapper>.Context context)
 
     @Override
     public void map(Text key, MetaWrapper value,
-        Context context) throws IOException, InterruptedException {
+            Context context) throws IOException, InterruptedException {
       Text newKey = new Text();
       String url = key.toString();
       if (normalizers != null) {
         try {
           url = normalizers.normalize(url, URLNormalizers.SCOPE_DEFAULT); // 
normalize the url.
         } catch (Exception e) {
-          LOG.warn("Skipping " + url + ":" + e.getMessage());
+          LOG.warn("Skipping {} :", url, e.getMessage());
           url = null;
         }
       }
@@ -411,7 +401,7 @@ public void map(Text key, MetaWrapper value,
         try {
           url = filters.filter(url);
         } catch (Exception e) {
-          LOG.warn("Skipping key " + url + ": " + e.getMessage());
+          LOG.warn("Skipping key {} : ", url, e.getMessage());
           url = null;
         }
       }
@@ -429,7 +419,7 @@ public void map(Text key, MetaWrapper value,
    * order as their creation time increases.
    */
   public static class SegmentMergerReducer extends
-      Reducer<Text, MetaWrapper, Text, MetaWrapper> {
+  Reducer<Text, MetaWrapper, Text, MetaWrapper> {
 
     private SegmentMergeFilters mergeFilters = null;
     private long sliceSize = -1;
@@ -443,7 +433,7 @@ public void setup(Reducer<Text, MetaWrapper, Text, 
MetaWrapper>.Context context)
       }      
       sliceSize = conf.getLong("segment.merger.slice", -1);
       if ((sliceSize > 0) && (LOG.isInfoEnabled())) {
-        LOG.info("Slice size: " + sliceSize + " URLs.");
+        LOG.info("Slice size: {} URLs.", sliceSize);
       }
       if (sliceSize > 0) {
         sliceSize = sliceSize / 
Integer.parseInt(conf.get("mapreduce.job.reduces"));
@@ -452,7 +442,7 @@ public void setup(Reducer<Text, MetaWrapper, Text, 
MetaWrapper>.Context context)
 
     @Override
     public void reduce(Text key, Iterable<MetaWrapper> values,
-        Context context) throws IOException, InterruptedException {
+            Context context) throws IOException, InterruptedException {
       CrawlDatum lastG = null;
       CrawlDatum lastF = null;
       CrawlDatum lastSig = null;
@@ -492,8 +482,8 @@ public void reduce(Text key, Iterable<MetaWrapper> values,
             // https://issues.apache.org/jira/browse/NUTCH-1520
             // https://issues.apache.org/jira/browse/NUTCH-1113
             if (CrawlDatum.hasFetchStatus(val)
-                && val.getStatus() != CrawlDatum.STATUS_FETCH_RETRY
-                && val.getStatus() != CrawlDatum.STATUS_FETCH_NOTMODIFIED) {
+                    && val.getStatus() != CrawlDatum.STATUS_FETCH_RETRY
+                    && val.getStatus() != CrawlDatum.STATUS_FETCH_NOTMODIFIED) 
{
               if (lastF == null) {
                 lastF = val;
                 lastFname = sp.segmentName;
@@ -562,13 +552,13 @@ public void reduce(Text key, Iterable<MetaWrapper> values,
       }
       // perform filtering based on full merge record
       if (mergeFilters != null
-          && !mergeFilters.filter(key, lastG, lastF, lastSig, lastC, lastPD,
-              lastPT, linked.isEmpty() ? null : 
linked.lastEntry().getValue())) {
+              && !mergeFilters.filter(key, lastG, lastF, lastSig, lastC, 
lastPD,
+                      lastPT, linked.isEmpty() ? null : 
linked.lastEntry().getValue())) {
         return;
       }
 
       curCount++;
-      String sliceName = null;
+      String sliceName;
       MetaWrapper wrapper = new MetaWrapper();
       if (sliceSize > 0) {
         sliceName = String.valueOf(curCount / sliceSize);
@@ -634,11 +624,10 @@ public void reduce(Text key, Iterable<MetaWrapper> values,
   }
 
   public void merge(Path out, Path[] segs, boolean filter, boolean normalize,
-      long slice) throws Exception {
+          long slice) throws IOException, ClassNotFoundException, 
InterruptedException {
     String segmentName = Generator.generateSegmentName();
     if (LOG.isInfoEnabled()) {
-      LOG.info("Merging " + segs.length + " segments to " + out + "/"
-          + segmentName);
+      LOG.info("Merging {} segments to {}/{}", segs.length, out, segmentName);
     }
     Job job = NutchJob.getInstance(getConf());
     Configuration conf = job.getConfiguration();
@@ -654,7 +643,7 @@ public void merge(Path out, Path[] segs, boolean filter, 
boolean normalize,
     boolean c = true;
     boolean pd = true;
     boolean pt = true;
-    
+
     // These contain previous values, we use it to track changes in the loop
     boolean pg = true;
     boolean pf = true;
@@ -666,13 +655,13 @@ public void merge(Path out, Path[] segs, boolean filter, 
boolean normalize,
       FileSystem fs = segs[i].getFileSystem(conf);
       if (!fs.exists(segs[i])) {
         if (LOG.isWarnEnabled()) {
-          LOG.warn("Input dir " + segs[i] + " doesn't exist, skipping.");
+          LOG.warn("Input dir {} doesn't exist, skipping.", segs[i]);
         }
         segs[i] = null;
         continue;
       }
       if (LOG.isInfoEnabled()) {
-        LOG.info("SegmentMerger:   adding " + segs[i]);
+        LOG.info("SegmentMerger:   adding {}", segs[i]);
       }
       Path cDir = new Path(segs[i], Content.DIR_NAME);
       Path gDir = new Path(segs[i], CrawlDatum.GENERATE_DIR_NAME);
@@ -686,15 +675,15 @@ public void merge(Path out, Path[] segs, boolean filter, 
boolean normalize,
       p = p && fs.exists(pDir);
       pd = pd && fs.exists(pdDir);
       pt = pt && fs.exists(ptDir);
-      
+
       // Input changed?
       if (g != pg || f != pf || p != pp || c != pc || pd != ppd || pt != ppt) {
-        LOG.info(segs[i] + " changed input dirs");
+        LOG.info("{} changed input dirs", segs[i]);
       }
-      
+
       pg = g; pf = f; pp = p; pc = c; ppd = pd; ppt = pt;
     }
-    StringBuffer sb = new StringBuffer();
+    StringBuilder sb = new StringBuilder();
     if (c)
       sb.append(" " + Content.DIR_NAME);
     if (g)
@@ -708,7 +697,7 @@ public void merge(Path out, Path[] segs, boolean filter, 
boolean normalize,
     if (pt)
       sb.append(" " + ParseText.DIR_NAME);
     if (LOG.isInfoEnabled()) {
-      LOG.info("SegmentMerger: using segment data from:" + sb.toString());
+      LOG.info("SegmentMerger: using segment data from: {}", sb.toString());
     }
     for (int i = 0; i < segs.length; i++) {
       if (segs[i] == null)
@@ -758,18 +747,18 @@ public void merge(Path out, Path[] segs, boolean filter, 
boolean normalize,
   public int run(String[] args)  throws Exception {
     if (args.length < 2) {
       System.err
-          .println("SegmentMerger output_dir (-dir segments | seg1 seg2 ...) 
[-filter] [-slice NNNN]");
+      .println("SegmentMerger output_dir (-dir segments | seg1 seg2 ...) 
[-filter] [-slice NNNN]");
       System.err
-          .println("\toutput_dir\tname of the parent dir for output segment 
slice(s)");
+      .println("\toutput_dir\tname of the parent dir for output segment 
slice(s)");
       System.err
-          .println("\t-dir segments\tparent dir containing several segments");
+      .println("\t-dir segments\tparent dir containing several segments");
       System.err.println("\tseg1 seg2 ...\tlist of segment dirs");
       System.err
-          .println("\t-filter\t\tfilter out URL-s prohibited by current 
URLFilters");
+      .println("\t-filter\t\tfilter out URL-s prohibited by current 
URLFilters");
       System.err
-          .println("\t-normalize\t\tnormalize URL via current URLNormalizers");
+      .println("\t-normalize\t\tnormalize URL via current URLNormalizers");
       System.err
-          .println("\t-slice NNNN\tcreate many output segments, each 
containing NNNN URLs");
+      .println("\t-slice NNNN\tcreate many output segments, each containing 
NNNN URLs");
       return -1;
     }
     Configuration conf = NutchConfiguration.create();
@@ -779,37 +768,37 @@ public int run(String[] args)  throws Exception {
     boolean filter = false;
     boolean normalize = false;
     for (int i = 1; i < args.length; i++) {
-      if (args[i].equals("-dir")) {
+      if ("-dir".equals(args[i])) {
         Path dirPath = new Path(args[++i]);
         FileSystem fs = dirPath.getFileSystem(conf);
         FileStatus[] fstats = fs.listStatus(dirPath,
-            HadoopFSUtil.getPassDirectoriesFilter(fs));
+                HadoopFSUtil.getPassDirectoriesFilter(fs));
         Path[] files = HadoopFSUtil.getPaths(fstats);
         for (int j = 0; j < files.length; j++)
           segs.add(files[j]);
-      } else if (args[i].equals("-filter")) {
+      } else if ("-filter".equals(args[i])) {
         filter = true;
-      } else if (args[i].equals("-normalize")) {
+      } else if ("-normalize".equals(args[i])) {
         normalize = true;
-      } else if (args[i].equals("-slice")) {
+      } else if ("-slice".equals(args[i])) {
         sliceSize = Long.parseLong(args[++i]);
       } else {
         segs.add(new Path(args[i]));
       }
     }
-    if (segs.size() == 0) {
+    if (segs.isEmpty()) {
       System.err.println("ERROR: No input segments.");
       return -1;
     }
 
     merge(out, segs.toArray(new Path[segs.size()]), filter, normalize,
-        sliceSize);
+            sliceSize);
     return 0;
   }
 
   public static void main(String[] args) throws Exception {
     int result = ToolRunner.run(NutchConfiguration.create(),
-        new SegmentMerger(), args);
+            new SegmentMerger(), args);
     System.exit(result);
   }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> mergesegs corrupts segment data
> -------------------------------
>
>                 Key: NUTCH-2517
>                 URL: https://issues.apache.org/jira/browse/NUTCH-2517
>             Project: Nutch
>          Issue Type: Bug
>          Components: segment
>    Affects Versions: 1.15
>         Environment: xubuntu 17.10, docker container of apache/nutch LATEST
>            Reporter: Marco Ebbinghaus
>            Assignee: Lewis John McGibbney
>            Priority: Blocker
>              Labels: mapreduce, mergesegs
>             Fix For: 1.15
>
>         Attachments: Screenshot_2018-03-03_18-09-28.png, 
> Screenshot_2018-03-07_07-50-05.png
>
>
> The problem probably occurs since commit 
> [https://github.com/apache/nutch/commit/54510e503f7da7301a59f5f0e5bf4509b37d35b4]
> How to reproduce:
>  * create container from apache/nutch image (latest)
>  * open terminal in that container
>  * set http.agent.name
>  * create crawldir and urls file
>  * run bin/nutch inject (bin/nutch inject mycrawl/crawldb urls/urls)
>  * run bin/nutch generate (bin/nutch generate mycrawl/crawldb 
> mycrawl/segments 1)
>  ** this results in a segment (e.g. 20180304134215)
>  * run bin/nutch fetch (bin/nutch fetch mycrawl/segments/20180304134215 
> -threads 2)
>  * run bin/nutch parse (bin/nutch parse mycrawl/segments/20180304134215 
> -threads 2)
>  ** ls in the segment folder -> existing folders: content, crawl_fetch, 
> crawl_generate, crawl_parse, parse_data, parse_text
>  * run bin/nutch updatedb (bin/nutch updatedb mycrawl/crawldb 
> mycrawl/segments/20180304134215)
>  * run bin/nutch mergesegs (bin/nutch mergesegs mycrawl/MERGEDsegments 
> mycrawl/segments/* -filter)
>  ** console output: `SegmentMerger: using segment data from: content 
> crawl_generate crawl_fetch crawl_parse parse_data parse_text`
>  ** resulting segment: 20180304134535
>  * ls in mycrawl/MERGEDsegments/segment/20180304134535 -> only existing 
> folder: crawl_generate
>  * run bin/nutch invertlinks (bin/nutch invertlinks mycrawl/linkdb -dir 
> mycrawl/MERGEDsegments) which results in a consequential error
>  ** console output: `LinkDb: adding segment: 
> [file:/root/nutch_source/runtime/local/mycrawl/MERGEDsegments/20180304134535|file:///root/nutch_source/runtime/local/mycrawl/MERGEDsegments/20180304134535]
>  LinkDb: org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input 
> path does not exist: 
> [file:/root/nutch_source/runtime/local/mycrawl/MERGEDsegments/20180304134535/parse_data|file:///root/nutch_source/runtime/local/mycrawl/MERGEDsegments/20180304134535/parse_data]
>      at 
> org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:323)
>      at 
> org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:265)
>      at 
> org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.listStatus(SequenceFileInputFormat.java:59)
>      at 
> org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:387)
>      at 
> org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:301)
>      at 
> org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:318)
>      at 
> org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:196)
>      at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1290)
>      at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1287)
>      at java.security.AccessController.doPrivileged(Native Method)
>      at javax.security.auth.Subject.doAs(Subject.java:422)
>      at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746)
>      at org.apache.hadoop.mapreduce.Job.submit(Job.java:1287)
>      at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1308)
>      at org.apache.nutch.crawl.LinkDb.invert(LinkDb.java:224)
>      at org.apache.nutch.crawl.LinkDb.run(LinkDb.java:353)
>      at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
>      at org.apache.nutch.crawl.LinkDb.main(LinkDb.java:313)`
> So as it seems mapreduce corrupts the segment folder during mergesegs command.
>  
> Pay attention to the fact that this issue is not related on trying to merge a 
> single segment like described above. As you can see on the attached 
> screenshot that problem also appears when executing multiple bin/nutch 
> generate/fetch/parse/updatedb commands before executing mergesegs - resulting 
> in a segment count > 1.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to