Modified: nutch/trunk/src/java/org/apache/nutch/scoring/webgraph/WebGraph.java
URL: 
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/scoring/webgraph/WebGraph.java?rev=1655526&r1=1655525&r2=1655526&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/scoring/webgraph/WebGraph.java 
(original)
+++ nutch/trunk/src/java/org/apache/nutch/scoring/webgraph/WebGraph.java Thu 
Jan 29 05:38:59 2015
@@ -107,8 +107,9 @@ public class WebGraph extends Configured
    * by domain and host can be ignored. The number of Outlinks out to a given
    * page or domain can also be limited.
    */
-  public static class OutlinkDb extends Configured implements 
-      Mapper<Text, Writable, Text, NutchWritable>, Reducer<Text, 
NutchWritable, Text, LinkDatum> {
+  public static class OutlinkDb extends Configured implements
+      Mapper<Text, Writable, Text, NutchWritable>,
+      Reducer<Text, NutchWritable, Text, LinkDatum> {
 
     public static final String URL_NORMALIZING = "webgraph.url.normalizers";
     public static final String URL_FILTERING = "webgraph.url.filters";
@@ -133,7 +134,8 @@ public class WebGraph extends Configured
     /**
      * Normalizes and trims extra whitespace from the given url.
      * 
-     * @param url The url to normalize.
+     * @param url
+     *          The url to normalize.
      * 
      * @return The normalized url.
      */
@@ -151,8 +153,7 @@ public class WebGraph extends Configured
           normalized = urlNormalizers.normalize(url,
               URLNormalizers.SCOPE_DEFAULT);
           normalized = normalized.trim();
-        }
-        catch (Exception e) {
+        } catch (Exception e) {
           LOG.warn("Skipping " + url + ":" + e);
           normalized = null;
         }
@@ -162,9 +163,10 @@ public class WebGraph extends Configured
 
     /**
      * Filters the given url.
-     *
-     * @param url The url to filter.
-     *
+     * 
+     * @param url
+     *          The url to filter.
+     * 
      * @return The filtered url or null.
      */
     private String filterUrl(String url) {
@@ -186,7 +188,8 @@ public class WebGraph extends Configured
      * Returns the fetch time from the parse data or the current system time if
      * the fetch time doesn't exist.
      * 
-     * @param data The parse data.
+     * @param data
+     *          The parse data.
      * 
      * @return The fetch time as a long.
      */
@@ -198,8 +201,7 @@ public class WebGraph extends Configured
       try {
         // get the fetch time from the parse data
         fetchTime = Long.parseLong(fetchTimeStr);
-      }
-      catch (Exception e) {
+      } catch (Exception e) {
         fetchTime = System.currentTimeMillis();
       }
       return fetchTime;
@@ -244,8 +246,9 @@ public class WebGraph extends Configured
      * Passes through existing LinkDatum objects from an existing OutlinkDb and
      * maps out new LinkDatum objects from new crawls ParseData.
      */
-    public void map(Text key, Writable value, OutputCollector<Text, 
NutchWritable> 
-        output, Reporter reporter) throws IOException {
+    public void map(Text key, Writable value,
+        OutputCollector<Text, NutchWritable> output, Reporter reporter)
+        throws IOException {
 
       // normalize url, stop processing if null
       String url = normalizeUrl(key.toString());
@@ -262,20 +265,19 @@ public class WebGraph extends Configured
       key.set(url);
 
       if (value instanceof CrawlDatum) {
-        CrawlDatum datum = (CrawlDatum)value;
+        CrawlDatum datum = (CrawlDatum) value;
 
-        if (datum.getStatus() == CrawlDatum.STATUS_FETCH_REDIR_TEMP ||
-            datum.getStatus() == CrawlDatum.STATUS_FETCH_REDIR_PERM ||
-            datum.getStatus() == CrawlDatum.STATUS_FETCH_GONE) {
+        if (datum.getStatus() == CrawlDatum.STATUS_FETCH_REDIR_TEMP
+            || datum.getStatus() == CrawlDatum.STATUS_FETCH_REDIR_PERM
+            || datum.getStatus() == CrawlDatum.STATUS_FETCH_GONE) {
 
           // Tell the reducer to get rid of all instances of this key
           output.collect(key, new NutchWritable(new BooleanWritable(true)));
         }
-      }
-      else if (value instanceof ParseData) {
+      } else if (value instanceof ParseData) {
         // get the parse data and the outlinks from the parse data, along with
         // the fetch time for those links
-        ParseData data = (ParseData)value;
+        ParseData data = (ParseData) value;
         long fetchTime = getFetchTime(data);
         Outlink[] outlinkAr = data.getOutlinks();
         Map<String, String> outlinkMap = new LinkedHashMap<String, String>();
@@ -307,9 +309,8 @@ public class WebGraph extends Configured
           LinkDatum datum = new LinkDatum(outlinkUrl, anchor, fetchTime);
           output.collect(key, new NutchWritable(datum));
         }
-      }
-      else if (value instanceof LinkDatum) {
-        LinkDatum datum = (LinkDatum)value;
+      } else if (value instanceof LinkDatum) {
+        LinkDatum datum = (LinkDatum) value;
         String linkDatumUrl = normalizeUrl(datum.getUrl());
 
         if (filterUrl(linkDatumUrl) != null) {
@@ -323,7 +324,7 @@ public class WebGraph extends Configured
 
     public void reduce(Text key, Iterator<NutchWritable> values,
         OutputCollector<Text, LinkDatum> output, Reporter reporter)
-            throws IOException {
+        throws IOException {
 
       // aggregate all outlinks, get the most recent timestamp for a fetch
       // which should be the timestamp for all of the most recent outlinks
@@ -334,17 +335,17 @@ public class WebGraph extends Configured
 
         if (value instanceof LinkDatum) {
           // loop through, change out most recent timestamp if needed
-          LinkDatum next = (LinkDatum)value;
+          LinkDatum next = (LinkDatum) value;
           long timestamp = next.getTimestamp();
           if (mostRecent == 0L || mostRecent < timestamp) {
             mostRecent = timestamp;
           }
           outlinkList.add(WritableUtils.clone(next, conf));
           reporter.incrCounter("WebGraph.outlinks", "added links", 1);
-        }
-        else if (value instanceof BooleanWritable) {
-          BooleanWritable delete = (BooleanWritable)value;
-          // Actually, delete is always true, otherwise we don't emit it in 
the mapper in the first place
+        } else if (value instanceof BooleanWritable) {
+          BooleanWritable delete = (BooleanWritable) value;
+          // Actually, delete is always true, otherwise we don't emit it in the
+          // mapper in the first place
           if (delete.get() == true) {
             // This page is gone, do not emit it's outlinks
             reporter.incrCounter("WebGraph.outlinks", "removed links", 1);
@@ -378,7 +379,8 @@ public class WebGraph extends Configured
             && (!limitPages || (limitPages && !pages.contains(toPage)))
             && (!limitDomains || (limitDomains && !domains.contains(toDomain)))
             && (!ignoreHost || (ignoreHost && !toHost.equalsIgnoreCase(host)))
-            && (!ignoreDomain || (ignoreDomain && 
!toDomain.equalsIgnoreCase(domain)))) {
+            && (!ignoreDomain || (ignoreDomain && !toDomain
+                .equalsIgnoreCase(domain)))) {
           output.collect(key, datum);
           pages.add(toPage);
           domains.add(toDomain);
@@ -395,7 +397,8 @@ public class WebGraph extends Configured
    * OutlinkDb LinkDatum objects and are regenerated each time the WebGraph is
    * updated.
    */
-  private static class InlinkDb extends Configured implements Mapper<Text, 
LinkDatum, Text, LinkDatum> {
+  private static class InlinkDb extends Configured implements
+      Mapper<Text, LinkDatum, Text, LinkDatum> {
 
     private long timestamp;
 
@@ -414,8 +417,9 @@ public class WebGraph extends Configured
      * Inverts the Outlink LinkDatum objects into new LinkDatum objects with a
      * new system timestamp, type and to and from url switched.
      */
-    public void map(Text key, LinkDatum datum, OutputCollector<Text, 
LinkDatum> 
-        output, Reporter reporter) throws IOException {
+    public void map(Text key, LinkDatum datum,
+        OutputCollector<Text, LinkDatum> output, Reporter reporter)
+        throws IOException {
 
       // get the to and from url and the anchor
       String fromUrl = key.toString();
@@ -433,21 +437,25 @@ public class WebGraph extends Configured
    * Creates the Node database which consists of the number of in and outlinks
    * for each url and a score slot for analysis programs such as LinkRank.
    */
-  private static class NodeDb extends Configured implements Reducer<Text, 
LinkDatum, Text, Node> {
+  private static class NodeDb extends Configured implements
+      Reducer<Text, LinkDatum, Text, Node> {
 
     /**
      * Configures job.
      */
-    public void configure(JobConf conf) { }
+    public void configure(JobConf conf) {
+    }
 
-    public void close() { }
+    public void close() {
+    }
 
     /**
      * Counts the number of inlinks and outlinks for each url and sets a 
default
      * score of 0.0 for each url (node) in the webgraph.
      */
-    public void reduce(Text key, Iterator<LinkDatum> values, 
OutputCollector<Text, Node> 
-        output, Reporter reporter) throws IOException {
+    public void reduce(Text key, Iterator<LinkDatum> values,
+        OutputCollector<Text, Node> output, Reporter reporter)
+        throws IOException {
 
       Node node = new Node();
       int numInlinks = 0;
@@ -458,8 +466,7 @@ public class WebGraph extends Configured
         LinkDatum next = values.next();
         if (next.getLinkType() == LinkDatum.INLINK) {
           numInlinks++;
-        }
-        else if (next.getLinkType() == LinkDatum.OUTLINK) {
+        } else if (next.getLinkType() == LinkDatum.OUTLINK) {
           numOutlinks++;
         }
       }
@@ -477,16 +484,21 @@ public class WebGraph extends Configured
    * Node. If a current WebGraph exists then it is updated, if it doesn't exist
    * then a new WebGraph database is created.
    * 
-   * @param webGraphDb The WebGraph to create or update.
-   * @param segments The array of segments used to update the WebGraph. Newer
-   * segments and fetch times will overwrite older segments.
-   * @param normalize whether to use URLNormalizers on URL's in the segment
-   * @param filter whether to use URLFilters on URL's in the segment
+   * @param webGraphDb
+   *          The WebGraph to create or update.
+   * @param segments
+   *          The array of segments used to update the WebGraph. Newer segments
+   *          and fetch times will overwrite older segments.
+   * @param normalize
+   *          whether to use URLNormalizers on URL's in the segment
+   * @param filter
+   *          whether to use URLFilters on URL's in the segment
    * 
-   * @throws IOException If an error occurs while processing the WebGraph.
+   * @throws IOException
+   *           If an error occurs while processing the WebGraph.
    */
-  public void createWebGraph(Path webGraphDb, Path[] segments, boolean 
normalize, boolean filter)
-      throws IOException {
+  public void createWebGraph(Path webGraphDb, Path[] segments,
+      boolean normalize, boolean filter) throws IOException {
 
     SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
     long start = System.currentTimeMillis();
@@ -563,7 +575,8 @@ public class WebGraph extends Configured
     outlinkJob.setOutputValueClass(LinkDatum.class);
     FileOutputFormat.setOutputPath(outlinkJob, tempOutlinkDb);
     outlinkJob.setOutputFormat(MapFileOutputFormat.class);
-    outlinkJob.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", 
false);
+    outlinkJob.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs",
+        false);
 
     // run the outlinkdb job and replace any old outlinkdb with the new one
     try {
@@ -572,10 +585,10 @@ public class WebGraph extends Configured
       LOG.info("OutlinkDb: installing " + outlinkDb);
       FSUtils.replace(fs, oldOutlinkDb, outlinkDb, true);
       FSUtils.replace(fs, outlinkDb, tempOutlinkDb, true);
-      if (!preserveBackup && fs.exists(oldOutlinkDb)) fs.delete(oldOutlinkDb, 
true);
+      if (!preserveBackup && fs.exists(oldOutlinkDb))
+        fs.delete(oldOutlinkDb, true);
       LOG.info("OutlinkDb: finished");
-    }
-    catch (IOException e) {
+    } catch (IOException e) {
 
       // remove lock file and and temporary directory if an error occurs
       LockUtil.removeLockFile(fs, lock);
@@ -603,7 +616,8 @@ public class WebGraph extends Configured
     inlinkJob.setOutputValueClass(LinkDatum.class);
     FileOutputFormat.setOutputPath(inlinkJob, tempInlinkDb);
     inlinkJob.setOutputFormat(MapFileOutputFormat.class);
-    inlinkJob.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", 
false);
+    inlinkJob.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs",
+        false);
 
     try {
 
@@ -613,14 +627,13 @@ public class WebGraph extends Configured
       LOG.info("InlinkDb: installing " + inlinkDb);
       FSUtils.replace(fs, inlinkDb, tempInlinkDb, true);
       LOG.info("InlinkDb: finished");
-    }
-    catch (IOException e) {
+    } catch (IOException e) {
 
       // remove lock file and and temporary directory if an error occurs
       LockUtil.removeLockFile(fs, lock);
       if (fs.exists(tempInlinkDb)) {
         fs.delete(tempInlinkDb, true);
-      }      
+      }
       LOG.error(StringUtils.stringifyException(e));
       throw e;
     }
@@ -644,7 +657,8 @@ public class WebGraph extends Configured
     nodeJob.setOutputValueClass(Node.class);
     FileOutputFormat.setOutputPath(nodeJob, tempNodeDb);
     nodeJob.setOutputFormat(MapFileOutputFormat.class);
-    nodeJob.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", 
false);
+    nodeJob.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs",
+        false);
 
     try {
 
@@ -654,14 +668,13 @@ public class WebGraph extends Configured
       LOG.info("NodeDb: installing " + nodeDb);
       FSUtils.replace(fs, nodeDb, tempNodeDb, true);
       LOG.info("NodeDb: finished");
-    }
-    catch (IOException e) {
+    } catch (IOException e) {
 
       // remove lock file and and temporary directory if an error occurs
       LockUtil.removeLockFile(fs, lock);
       if (fs.exists(tempNodeDb)) {
         fs.delete(tempNodeDb, true);
-      }      
+      }
       LOG.error(StringUtils.stringifyException(e));
       throw e;
     }
@@ -670,7 +683,8 @@ public class WebGraph extends Configured
     LockUtil.removeLockFile(fs, lock);
 
     long end = System.currentTimeMillis();
-    LOG.info("WebGraphDb: finished at " + sdf.format(end) + ", elapsed: " + 
TimingUtil.elapsedTime(start, end));
+    LOG.info("WebGraphDb: finished at " + sdf.format(end) + ", elapsed: "
+        + TimingUtil.elapsedTime(start, end));
   }
 
   public static void main(String[] args) throws Exception {
@@ -683,26 +697,29 @@ public class WebGraph extends Configured
    */
   public int run(String[] args) throws Exception {
 
-    //boolean options
+    // boolean options
     Option helpOpt = new Option("h", "help", false, "show this help message");
-    Option normOpt = new Option("n", "normalize", false, "whether to use 
URLNormalizers on the URL's in the segment");
-    Option filtOpt = new Option("f", "filter", false, "whether to use 
URLFilters on the URL's in the segment");
+    Option normOpt = new Option("n", "normalize", false,
+        "whether to use URLNormalizers on the URL's in the segment");
+    Option filtOpt = new Option("f", "filter", false,
+        "whether to use URLFilters on the URL's in the segment");
 
-    //argument options
+    // argument options
     @SuppressWarnings("static-access")
-    Option graphOpt = OptionBuilder.withArgName("webgraphdb")
-        .hasArg().withDescription("the web graph database to create (if none 
exists) or use if one does")
+    Option graphOpt = OptionBuilder
+        .withArgName("webgraphdb")
+        .hasArg()
+        .withDescription(
+            "the web graph database to create (if none exists) or use if one 
does")
         .create("webgraphdb");
     @SuppressWarnings("static-access")
-    Option segOpt = OptionBuilder.withArgName("segment")
-        .hasArgs().withDescription("the segment(s) to use")
-        .create("segment");
+    Option segOpt = OptionBuilder.withArgName("segment").hasArgs()
+        .withDescription("the segment(s) to use").create("segment");
     @SuppressWarnings("static-access")
-    Option segDirOpt = OptionBuilder.withArgName("segmentDir")
-        .hasArgs().withDescription("the segment directory to use")
-        .create("segmentDir");
-    
-    //create the options
+    Option segDirOpt = OptionBuilder.withArgName("segmentDir").hasArgs()
+        .withDescription("the segment directory to use").create("segmentDir");
+
+    // create the options
     Options options = new Options();
     options.addOption(helpOpt);
     options.addOption(normOpt);
@@ -738,7 +755,8 @@ public class WebGraph extends Configured
       if (line.hasOption("segmentDir")) {
         Path dir = new Path(line.getOptionValue("segmentDir"));
         FileSystem fs = dir.getFileSystem(getConf());
-        FileStatus[] fstats = fs.listStatus(dir, 
HadoopFSUtil.getPassDirectoriesFilter(fs));
+        FileStatus[] fstats = fs.listStatus(dir,
+            HadoopFSUtil.getPassDirectoriesFilter(fs));
         segPaths = HadoopFSUtil.getPaths(fstats);
       }
 
@@ -756,8 +774,7 @@ public class WebGraph extends Configured
 
       createWebGraph(new Path(webGraphDb), segPaths, normalize, filter);
       return 0;
-    }
-    catch (Exception e) {
+    } catch (Exception e) {
       LOG.error("WebGraph: " + StringUtils.stringifyException(e));
       return -2;
     }

Modified: 
nutch/trunk/src/java/org/apache/nutch/scoring/webgraph/package-info.java
URL: 
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/scoring/webgraph/package-info.java?rev=1655526&r1=1655525&r2=1655526&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/scoring/webgraph/package-info.java 
(original)
+++ nutch/trunk/src/java/org/apache/nutch/scoring/webgraph/package-info.java 
Thu Jan 29 05:38:59 2015
@@ -21,3 +21,4 @@
  * see {@link org.apache.nutch.scoring.webgraph.WebGraph}.
  */
 package org.apache.nutch.scoring.webgraph;
+

Modified: 
nutch/trunk/src/java/org/apache/nutch/segment/ContentAsTextInputFormat.java
URL: 
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/segment/ContentAsTextInputFormat.java?rev=1655526&r1=1655525&r2=1655526&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/segment/ContentAsTextInputFormat.java 
(original)
+++ nutch/trunk/src/java/org/apache/nutch/segment/ContentAsTextInputFormat.java 
Thu Jan 29 05:38:59 2015
@@ -31,14 +31,14 @@ import org.apache.nutch.protocol.Content
 
 /**
  * An input format that takes Nutch Content objects and converts them to text
- * while converting newline endings to spaces.  This format is useful for 
working
+ * while converting newline endings to spaces. This format is useful for 
working
  * with Nutch content objects in Hadoop Streaming with other languages.
  */
-public class ContentAsTextInputFormat
-  extends SequenceFileInputFormat<Text, Text> {
+public class ContentAsTextInputFormat extends
+    SequenceFileInputFormat<Text, Text> {
 
-  private static class ContentAsTextRecordReader
-    implements RecordReader<Text, Text> {
+  private static class ContentAsTextRecordReader implements
+      RecordReader<Text, Text> {
 
     private final SequenceFileRecordReader<Text, Content> 
sequenceFileRecordReader;
 
@@ -46,9 +46,9 @@ public class ContentAsTextInputFormat
     private Content innerValue;
 
     public ContentAsTextRecordReader(Configuration conf, FileSplit split)
-      throws IOException {
+        throws IOException {
       sequenceFileRecordReader = new SequenceFileRecordReader<Text, Content>(
-        conf, split);
+          conf, split);
       innerKey = sequenceFileRecordReader.createKey();
       innerValue = sequenceFileRecordReader.createValue();
     }
@@ -61,9 +61,8 @@ public class ContentAsTextInputFormat
       return new Text();
     }
 
-    public synchronized boolean next(Text key, Text value)
-      throws IOException {
-      
+    public synchronized boolean next(Text key, Text value) throws IOException {
+
       // convert the content object to text
       Text tKey = key;
       Text tValue = value;
@@ -72,26 +71,23 @@ public class ContentAsTextInputFormat
       }
       tKey.set(innerKey.toString());
       String contentAsStr = new String(innerValue.getContent());
-      
+
       // replace new line endings with spaces
       contentAsStr = contentAsStr.replaceAll("\n", " ");
       value.set(contentAsStr);
-     
+
       return true;
     }
 
-    public float getProgress()
-      throws IOException {
+    public float getProgress() throws IOException {
       return sequenceFileRecordReader.getProgress();
     }
 
-    public synchronized long getPos()
-      throws IOException {
+    public synchronized long getPos() throws IOException {
       return sequenceFileRecordReader.getPos();
     }
 
-    public synchronized void close()
-      throws IOException {
+    public synchronized void close() throws IOException {
       sequenceFileRecordReader.close();
     }
   }
@@ -101,10 +97,9 @@ public class ContentAsTextInputFormat
   }
 
   public RecordReader<Text, Text> getRecordReader(InputSplit split,
-    JobConf job, Reporter reporter)
-    throws IOException {
+      JobConf job, Reporter reporter) throws IOException {
 
     reporter.setStatus(split.toString());
-    return new ContentAsTextRecordReader(job, (FileSplit)split);
+    return new ContentAsTextRecordReader(job, (FileSplit) split);
   }
 }

Modified: nutch/trunk/src/java/org/apache/nutch/segment/SegmentMergeFilters.java
URL: 
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMergeFilters.java?rev=1655526&r1=1655525&r2=1655526&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/segment/SegmentMergeFilters.java 
(original)
+++ nutch/trunk/src/java/org/apache/nutch/segment/SegmentMergeFilters.java Thu 
Jan 29 05:38:59 2015
@@ -39,7 +39,8 @@ import org.apache.nutch.protocol.Content
  * 
  */
 public class SegmentMergeFilters {
-  private static final Logger LOG = 
LoggerFactory.getLogger(SegmentMergeFilters.class);
+  private static final Logger LOG = LoggerFactory
+      .getLogger(SegmentMergeFilters.class);
   private SegmentMergeFilter[] filters;
 
   public SegmentMergeFilters(Configuration conf) {

Modified: nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java
URL: 
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java?rev=1655526&r1=1655525&r2=1655526&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java Thu Jan 29 
05:38:59 2015
@@ -73,40 +73,47 @@ import org.apache.nutch.util.NutchJob;
  * <p>
  * Also, it's possible to slice the resulting segment into chunks of fixed 
size.
  * </p>
- * <h3>Important Notes</h3>
- * <h4>Which parts are merged?</h4>
- * <p>It doesn't make sense to merge data from segments, which are at 
different stages
- * of processing (e.g. one unfetched segment, one fetched but not parsed, and
- * one fetched and parsed). Therefore, prior to merging, the tool will 
determine
- * the lowest common set of input data, and only this data will be merged.
- * This may have some unintended consequences:
- * e.g. if majority of input segments are fetched and parsed, but one of them 
is unfetched,
- * the tool will fall back to just merging fetchlists, and it will skip all 
other data
- * from all segments.</p>
+ * <h3>Important Notes</h3> <h4>Which parts are merged?</h4>
+ * <p>
+ * It doesn't make sense to merge data from segments, which are at different
+ * stages of processing (e.g. one unfetched segment, one fetched but not 
parsed,
+ * and one fetched and parsed). Therefore, prior to merging, the tool will
+ * determine the lowest common set of input data, and only this data will be
+ * merged. This may have some unintended consequences: e.g. if majority of 
input
+ * segments are fetched and parsed, but one of them is unfetched, the tool will
+ * fall back to just merging fetchlists, and it will skip all other data from
+ * all segments.
+ * </p>
  * <h4>Merging fetchlists</h4>
- * <p>Merging segments, which contain just fetchlists (i.e. prior to fetching)
- * is not recommended, because this tool (unlike the {@link 
org.apache.nutch.crawl.Generator}
- * doesn't ensure that fetchlist parts for each map task are disjoint.</p>
+ * <p>
+ * Merging segments, which contain just fetchlists (i.e. prior to fetching) is
+ * not recommended, because this tool (unlike the
+ * {@link org.apache.nutch.crawl.Generator} doesn't ensure that fetchlist parts
+ * for each map task are disjoint.
+ * </p>
  * <p>
  * <h4>Duplicate content</h4>
- * Merging segments removes older content whenever possible (see below). 
However,
- * this is NOT the same as de-duplication, which in addition removes identical
- * content found at different URL-s. In other words, running DeleteDuplicates 
is
- * still necessary.
+ * Merging segments removes older content whenever possible (see below).
+ * However, this is NOT the same as de-duplication, which in addition removes
+ * identical content found at different URL-s. In other words, running
+ * DeleteDuplicates is still necessary.
+ * </p>
+ * <p>
+ * For some types of data (especially ParseText) it's not possible to determine
+ * which version is really older. Therefore the tool always uses segment names
+ * as timestamps, for all types of input data. Segment names are compared in
+ * forward lexicographic order (0-9a-zA-Z), and data from segments with 
"higher"
+ * names will prevail. It follows then that it is extremely important that
+ * segments be named in an increasing lexicographic order as their creation 
time
+ * increases.
  * </p>
- * <p>For some types of data (especially ParseText) it's not possible to 
determine
- * which version is really older. Therefore the tool always uses segment names 
as
- * timestamps, for all types of input data. Segment names are compared in 
forward lexicographic
- * order (0-9a-zA-Z), and data from segments with "higher" names will prevail.
- * It follows then that it is extremely important that segments be named in an
- * increasing lexicographic order as their creation time increases.</p>
  * <p>
  * <h4>Merging and indexes</h4>
  * Merged segment gets a different name. Since Indexer embeds segment names in
- * indexes, any indexes originally created for the input segments will NOT 
work with the
- * merged segment. Newly created merged segment(s) need to be indexed afresh.
- * This tool doesn't use existing indexes in any way, so if
- * you plan to merge segments you don't have to index them prior to merging.
+ * indexes, any indexes originally created for the input segments will NOT work
+ * with the merged segment. Newly created merged segment(s) need to be indexed
+ * afresh. This tool doesn't use existing indexes in any way, so if you plan to
+ * merge segments you don't have to index them prior to merging.
  * 
  * 
  * @author Andrzej Bialecki
@@ -114,7 +121,8 @@ import org.apache.nutch.util.NutchJob;
 public class SegmentMerger extends Configured implements
     Mapper<Text, MetaWrapper, Text, MetaWrapper>,
     Reducer<Text, MetaWrapper, Text, MetaWrapper> {
-  private static final Logger LOG = 
LoggerFactory.getLogger(SegmentMerger.class);
+  private static final Logger LOG = LoggerFactory
+      .getLogger(SegmentMerger.class);
 
   private static final String SEGMENT_PART_KEY = "part";
   private static final String SEGMENT_SLICE_KEY = "slice";
@@ -124,20 +132,21 @@ public class SegmentMerger extends Confi
   private SegmentMergeFilters mergeFilters = null;
   private long sliceSize = -1;
   private long curCount = 0;
-  
+
   /**
-   * Wraps inputs in an {@link MetaWrapper}, to permit merging different
-   * types in reduce and use additional metadata.
+   * Wraps inputs in an {@link MetaWrapper}, to permit merging different types
+   * in reduce and use additional metadata.
    */
   public static class ObjectInputFormat extends
-    SequenceFileInputFormat<Text, MetaWrapper> {
-    
+      SequenceFileInputFormat<Text, MetaWrapper> {
+
     @Override
-    public RecordReader<Text, MetaWrapper> getRecordReader(final InputSplit 
split,
-        final JobConf job, Reporter reporter) throws IOException {
+    public RecordReader<Text, MetaWrapper> getRecordReader(
+        final InputSplit split, final JobConf job, Reporter reporter)
+        throws IOException {
 
       reporter.setStatus(split.toString());
-      
+
       // find part name
       SegmentPart segmentPart;
       final String spString;
@@ -148,10 +157,10 @@ public class SegmentMerger extends Confi
       } catch (IOException e) {
         throw new RuntimeException("Cannot identify segment:", e);
       }
-      
-      SequenceFile.Reader reader =
-        new SequenceFile.Reader(FileSystem.get(job), fSplit.getPath(), job);
-      
+
+      SequenceFile.Reader reader = new SequenceFile.Reader(FileSystem.get(job),
+          fSplit.getPath(), job);
+
       final Writable w;
       try {
         w = (Writable) reader.getValueClass().newInstance();
@@ -164,13 +173,14 @@ public class SegmentMerger extends Confi
           // ignore
         }
       }
-      final SequenceFileRecordReader<Text,Writable> splitReader =
-        new SequenceFileRecordReader<Text,Writable>(job, (FileSplit)split);
+      final SequenceFileRecordReader<Text, Writable> splitReader = new 
SequenceFileRecordReader<Text, Writable>(
+          job, (FileSplit) split);
 
       try {
         return new SequenceFileRecordReader<Text, MetaWrapper>(job, fSplit) {
-          
-          public synchronized boolean next(Text key, MetaWrapper wrapper) 
throws IOException {
+
+          public synchronized boolean next(Text key, MetaWrapper wrapper)
+              throws IOException {
             LOG.debug("Running OIF.next()");
 
             boolean res = splitReader.next(key, w);
@@ -178,17 +188,17 @@ public class SegmentMerger extends Confi
             wrapper.setMeta(SEGMENT_PART_KEY, spString);
             return res;
           }
-          
+
           @Override
           public synchronized void close() throws IOException {
             splitReader.close();
           }
-          
+
           @Override
           public MetaWrapper createValue() {
             return new MetaWrapper();
           }
-          
+
         };
       } catch (IOException e) {
         throw new RuntimeException("Cannot create RecordReader: ", e);
@@ -196,11 +206,14 @@ public class SegmentMerger extends Confi
     }
   }
 
-  public static class SegmentOutputFormat extends FileOutputFormat<Text, 
MetaWrapper> {
+  public static class SegmentOutputFormat extends
+      FileOutputFormat<Text, MetaWrapper> {
     private static final String DEFAULT_SLICE = "default";
-    
+
     @Override
-    public RecordWriter<Text, MetaWrapper> getRecordWriter(final FileSystem 
fs, final JobConf job, final String name, final Progressable progress) throws 
IOException {
+    public RecordWriter<Text, MetaWrapper> getRecordWriter(final FileSystem fs,
+        final JobConf job, final String name, final Progressable progress)
+        throws IOException {
       return new RecordWriter<Text, MetaWrapper>() {
         MapFile.Writer c_out = null;
         MapFile.Writer f_out = null;
@@ -210,7 +223,7 @@ public class SegmentMerger extends Confi
         SequenceFile.Writer p_out = null;
         HashMap<String, Closeable> sliceWriters = new HashMap<String, 
Closeable>();
         String segmentName = job.get("segment.merger.segmentName");
-        
+
         public void write(Text key, MetaWrapper wrapper) throws IOException {
           // unwrap
           SegmentPart sp = 
SegmentPart.parse(wrapper.getMeta(SEGMENT_PART_KEY));
@@ -221,13 +234,15 @@ public class SegmentMerger extends Confi
               g_out = ensureSequenceFile(slice, CrawlDatum.GENERATE_DIR_NAME);
               g_out.append(key, o);
             } else if (sp.partName.equals(CrawlDatum.FETCH_DIR_NAME)) {
-              f_out = ensureMapFile(slice, CrawlDatum.FETCH_DIR_NAME, 
CrawlDatum.class);
+              f_out = ensureMapFile(slice, CrawlDatum.FETCH_DIR_NAME,
+                  CrawlDatum.class);
               f_out.append(key, o);
             } else if (sp.partName.equals(CrawlDatum.PARSE_DIR_NAME)) {
               p_out = ensureSequenceFile(slice, CrawlDatum.PARSE_DIR_NAME);
               p_out.append(key, o);
             } else {
-              throw new IOException("Cannot determine segment part: " + 
sp.partName);
+              throw new IOException("Cannot determine segment part: "
+                  + sp.partName);
             }
           } else if (o instanceof Content) {
             c_out = ensureMapFile(slice, Content.DIR_NAME, Content.class);
@@ -235,9 +250,11 @@ public class SegmentMerger extends Confi
           } else if (o instanceof ParseData) {
             // update the segment name inside contentMeta - required by Indexer
             if (slice == null) {
-              ((ParseData)o).getContentMeta().set(Nutch.SEGMENT_NAME_KEY, 
segmentName);
+              ((ParseData) o).getContentMeta().set(Nutch.SEGMENT_NAME_KEY,
+                  segmentName);
             } else {
-              ((ParseData)o).getContentMeta().set(Nutch.SEGMENT_NAME_KEY, 
segmentName + "-" + slice);
+              ((ParseData) o).getContentMeta().set(Nutch.SEGMENT_NAME_KEY,
+                  segmentName + "-" + slice);
             }
             pd_out = ensureMapFile(slice, ParseData.DIR_NAME, ParseData.class);
             pd_out.append(key, o);
@@ -246,20 +263,26 @@ public class SegmentMerger extends Confi
             pt_out.append(key, o);
           }
         }
-        
+
         // lazily create SequenceFile-s.
-        private SequenceFile.Writer ensureSequenceFile(String slice, String 
dirName) throws IOException {
-          if (slice == null) slice = DEFAULT_SLICE;
-          SequenceFile.Writer res = 
(SequenceFile.Writer)sliceWriters.get(slice + dirName);
-          if (res != null) return res;
+        private SequenceFile.Writer ensureSequenceFile(String slice,
+            String dirName) throws IOException {
+          if (slice == null)
+            slice = DEFAULT_SLICE;
+          SequenceFile.Writer res = (SequenceFile.Writer) sliceWriters
+              .get(slice + dirName);
+          if (res != null)
+            return res;
           Path wname;
           Path out = FileOutputFormat.getOutputPath(job);
           if (slice == DEFAULT_SLICE) {
-            wname = new Path(new Path(new Path(out, segmentName), dirName), 
name);
+            wname = new Path(new Path(new Path(out, segmentName), dirName),
+                name);
           } else {
-            wname = new Path(new Path(new Path(out, segmentName + "-" + 
slice), dirName), name);
+            wname = new Path(new Path(new Path(out, segmentName + "-" + slice),
+                dirName), name);
           }
-          res = SequenceFile.createWriter(fs, job, wname, Text.class, 
+          res = SequenceFile.createWriter(fs, job, wname, Text.class,
               CrawlDatum.class,
               SequenceFileOutputFormat.getOutputCompressionType(job), 
progress);
           sliceWriters.put(slice + dirName, res);
@@ -267,23 +290,30 @@ public class SegmentMerger extends Confi
         }
 
         // lazily create MapFile-s.
-        private MapFile.Writer ensureMapFile(String slice, String dirName, 
Class<? extends Writable> clazz) throws IOException {
-          if (slice == null) slice = DEFAULT_SLICE;
-          MapFile.Writer res = (MapFile.Writer)sliceWriters.get(slice + 
dirName);
-          if (res != null) return res;
+        private MapFile.Writer ensureMapFile(String slice, String dirName,
+            Class<? extends Writable> clazz) throws IOException {
+          if (slice == null)
+            slice = DEFAULT_SLICE;
+          MapFile.Writer res = (MapFile.Writer) sliceWriters.get(slice
+              + dirName);
+          if (res != null)
+            return res;
           Path wname;
           Path out = FileOutputFormat.getOutputPath(job);
           if (slice == DEFAULT_SLICE) {
-            wname = new Path(new Path(new Path(out, segmentName), dirName), 
name);
+            wname = new Path(new Path(new Path(out, segmentName), dirName),
+                name);
           } else {
-            wname = new Path(new Path(new Path(out, segmentName + "-" + 
slice), dirName), name);
+            wname = new Path(new Path(new Path(out, segmentName + "-" + slice),
+                dirName), name);
           }
-          CompressionType compType = 
-              SequenceFileOutputFormat.getOutputCompressionType(job);
+          CompressionType compType = SequenceFileOutputFormat
+              .getOutputCompressionType(job);
           if (clazz.isAssignableFrom(ParseText.class)) {
             compType = CompressionType.RECORD;
           }
-          res = new MapFile.Writer(job, fs, wname.toString(), Text.class, 
clazz, compType, progress);
+          res = new MapFile.Writer(job, fs, wname.toString(), Text.class,
+              clazz, compType, progress);
           sliceWriters.put(slice + dirName, res);
           return res;
         }
@@ -293,9 +323,9 @@ public class SegmentMerger extends Confi
           while (it.hasNext()) {
             Object o = it.next();
             if (o instanceof SequenceFile.Writer) {
-              ((SequenceFile.Writer)o).close();
+              ((SequenceFile.Writer) o).close();
             } else {
-              ((MapFile.Writer)o).close();
+              ((MapFile.Writer) o).close();
             }
           }
         }
@@ -306,14 +336,15 @@ public class SegmentMerger extends Confi
   public SegmentMerger() {
     super(null);
   }
-  
+
   public SegmentMerger(Configuration conf) {
     super(conf);
   }
-  
+
   public void setConf(Configuration conf) {
     super.setConf(conf);
-    if (conf == null) return;
+    if (conf == null)
+      return;
     if (conf.getBoolean("segment.merger.filter", false)) {
       filters = new URLFilters(conf);
       mergeFilters = new SegmentMergeFilters(conf);
@@ -335,15 +366,18 @@ public class SegmentMerger extends Confi
       sliceSize = sliceSize / conf.getNumReduceTasks();
     }
   }
-  
+
   private Text newKey = new Text();
-  
+
   public void map(Text key, MetaWrapper value,
-      OutputCollector<Text, MetaWrapper> output, Reporter reporter) throws 
IOException {
+      OutputCollector<Text, MetaWrapper> output, Reporter reporter)
+      throws IOException {
     String url = key.toString();
     if (normalizers != null) {
       try {
-        url = normalizers.normalize(url, URLNormalizers.SCOPE_DEFAULT); // 
normalize the url
+        url = normalizers.normalize(url, URLNormalizers.SCOPE_DEFAULT); // 
normalize
+                                                                        // the
+                                                                        // url
       } catch (Exception e) {
         LOG.warn("Skipping " + url + ":" + e.getMessage());
         url = null;
@@ -357,7 +391,7 @@ public class SegmentMerger extends Confi
         url = null;
       }
     }
-    if(url != null) {
+    if (url != null) {
       newKey.set(url);
       output.collect(newKey, value);
     }
@@ -365,12 +399,13 @@ public class SegmentMerger extends Confi
 
   /**
    * NOTE: in selecting the latest version we rely exclusively on the segment
-   * name (not all segment data contain time information). Therefore it is 
extremely
-   * important that segments be named in an increasing lexicographic order as
-   * their creation time increases.
+   * name (not all segment data contain time information). Therefore it is
+   * extremely important that segments be named in an increasing lexicographic
+   * order as their creation time increases.
    */
   public void reduce(Text key, Iterator<MetaWrapper> values,
-      OutputCollector<Text, MetaWrapper> output, Reporter reporter) throws 
IOException {
+      OutputCollector<Text, MetaWrapper> output, Reporter reporter)
+      throws IOException {
     CrawlDatum lastG = null;
     CrawlDatum lastF = null;
     CrawlDatum lastSig = null;
@@ -383,18 +418,17 @@ public class SegmentMerger extends Confi
     String lastCname = null;
     String lastPDname = null;
     String lastPTname = null;
-    TreeMap<String, ArrayList<CrawlDatum>> linked =
-      new TreeMap<String, ArrayList<CrawlDatum>>();
+    TreeMap<String, ArrayList<CrawlDatum>> linked = new TreeMap<String, 
ArrayList<CrawlDatum>>();
     while (values.hasNext()) {
       MetaWrapper wrapper = values.next();
       Object o = wrapper.get();
       String spString = wrapper.getMeta(SEGMENT_PART_KEY);
       if (spString == null) {
-        throw new IOException("Null segment part, key=" + key);        
+        throw new IOException("Null segment part, key=" + key);
       }
       SegmentPart sp = SegmentPart.parse(spString);
       if (o instanceof CrawlDatum) {
-        CrawlDatum val = (CrawlDatum)o;
+        CrawlDatum val = (CrawlDatum) o;
         // check which output dir it belongs to
         if (sp.partName.equals(CrawlDatum.GENERATE_DIR_NAME)) {
           if (lastG == null) {
@@ -411,9 +445,9 @@ public class SegmentMerger extends Confi
           // only consider fetch status and ignore fetch retry status
           // https://issues.apache.org/jira/browse/NUTCH-1520
           // https://issues.apache.org/jira/browse/NUTCH-1113
-          if (CrawlDatum.hasFetchStatus(val) &&
-            val.getStatus() != CrawlDatum.STATUS_FETCH_RETRY &&
-            val.getStatus() != CrawlDatum.STATUS_FETCH_NOTMODIFIED) {
+          if (CrawlDatum.hasFetchStatus(val)
+              && val.getStatus() != CrawlDatum.STATUS_FETCH_RETRY
+              && val.getStatus() != CrawlDatum.STATUS_FETCH_NOTMODIFIED) {
             if (lastF == null) {
               lastF = val;
               lastFname = sp.segmentName;
@@ -450,40 +484,40 @@ public class SegmentMerger extends Confi
         }
       } else if (o instanceof Content) {
         if (lastC == null) {
-          lastC = (Content)o;
+          lastC = (Content) o;
           lastCname = sp.segmentName;
         } else {
           if (lastCname.compareTo(sp.segmentName) < 0) {
-            lastC = (Content)o;
+            lastC = (Content) o;
             lastCname = sp.segmentName;
           }
         }
       } else if (o instanceof ParseData) {
         if (lastPD == null) {
-          lastPD = (ParseData)o;
+          lastPD = (ParseData) o;
           lastPDname = sp.segmentName;
         } else {
           if (lastPDname.compareTo(sp.segmentName) < 0) {
-            lastPD = (ParseData)o;
+            lastPD = (ParseData) o;
             lastPDname = sp.segmentName;
           }
         }
       } else if (o instanceof ParseText) {
         if (lastPT == null) {
-          lastPT = (ParseText)o;
+          lastPT = (ParseText) o;
           lastPTname = sp.segmentName;
         } else {
           if (lastPTname.compareTo(sp.segmentName) < 0) {
-            lastPT = (ParseText)o;
+            lastPT = (ParseText) o;
             lastPTname = sp.segmentName;
           }
         }
       }
     }
-       // perform filtering based on full merge record
-    if (mergeFilters != null && 
-        !mergeFilters.filter(key, lastG, lastF, lastSig, lastC, lastPD, 
lastPT, 
-                                                  linked.isEmpty() ? null : 
linked.lastEntry().getValue())){
+    // perform filtering based on full merge record
+    if (mergeFilters != null
+        && !mergeFilters.filter(key, lastG, lastF, lastSig, lastC, lastPD,
+            lastPT, linked.isEmpty() ? null : linked.lastEntry().getValue())) {
       return;
     }
 
@@ -552,10 +586,12 @@ public class SegmentMerger extends Confi
     }
   }
 
-  public void merge(Path out, Path[] segs, boolean filter, boolean normalize, 
long slice) throws Exception {
+  public void merge(Path out, Path[] segs, boolean filter, boolean normalize,
+      long slice) throws Exception {
     String segmentName = Generator.generateSegmentName();
     if (LOG.isInfoEnabled()) {
-      LOG.info("Merging " + segs.length + " segments to " + out + "/" + 
segmentName);
+      LOG.info("Merging " + segs.length + " segments to " + out + "/"
+          + segmentName);
     }
     JobConf job = new NutchJob(getConf());
     job.setJobName("mergesegs " + out + "/" + segmentName);
@@ -596,17 +632,24 @@ public class SegmentMerger extends Confi
       pt = pt && fs.exists(ptDir);
     }
     StringBuffer sb = new StringBuffer();
-    if (c) sb.append(" " + Content.DIR_NAME);
-    if (g) sb.append(" " + CrawlDatum.GENERATE_DIR_NAME);
-    if (f) sb.append(" " + CrawlDatum.FETCH_DIR_NAME);
-    if (p) sb.append(" " + CrawlDatum.PARSE_DIR_NAME);
-    if (pd) sb.append(" " + ParseData.DIR_NAME);
-    if (pt) sb.append(" " + ParseText.DIR_NAME);
+    if (c)
+      sb.append(" " + Content.DIR_NAME);
+    if (g)
+      sb.append(" " + CrawlDatum.GENERATE_DIR_NAME);
+    if (f)
+      sb.append(" " + CrawlDatum.FETCH_DIR_NAME);
+    if (p)
+      sb.append(" " + CrawlDatum.PARSE_DIR_NAME);
+    if (pd)
+      sb.append(" " + ParseData.DIR_NAME);
+    if (pt)
+      sb.append(" " + ParseText.DIR_NAME);
     if (LOG.isInfoEnabled()) {
       LOG.info("SegmentMerger: using segment data from:" + sb.toString());
     }
     for (int i = 0; i < segs.length; i++) {
-      if (segs[i] == null) continue;
+      if (segs[i] == null)
+        continue;
       if (g) {
         Path gDir = new Path(segs[i], CrawlDatum.GENERATE_DIR_NAME);
         FileInputFormat.addInputPath(job, gDir);
@@ -639,9 +682,9 @@ public class SegmentMerger extends Confi
     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(MetaWrapper.class);
     job.setOutputFormat(SegmentOutputFormat.class);
-    
+
     setConf(job);
-    
+
     JobClient.runJob(job);
   }
 
@@ -650,13 +693,19 @@ public class SegmentMerger extends Confi
    */
   public static void main(String[] args) throws Exception {
     if (args.length < 2) {
-      System.err.println("SegmentMerger output_dir (-dir segments | seg1 seg2 
...) [-filter] [-slice NNNN]");
-      System.err.println("\toutput_dir\tname of the parent dir for output 
segment slice(s)");
-      System.err.println("\t-dir segments\tparent dir containing several 
segments");
+      System.err
+          .println("SegmentMerger output_dir (-dir segments | seg1 seg2 ...) 
[-filter] [-slice NNNN]");
+      System.err
+          .println("\toutput_dir\tname of the parent dir for output segment 
slice(s)");
+      System.err
+          .println("\t-dir segments\tparent dir containing several segments");
       System.err.println("\tseg1 seg2 ...\tlist of segment dirs");
-      System.err.println("\t-filter\t\tfilter out URL-s prohibited by current 
URLFilters");
-      System.err.println("\t-normalize\t\tnormalize URL via current 
URLNormalizers");
-      System.err.println("\t-slice NNNN\tcreate many output segments, each 
containing NNNN URLs");
+      System.err
+          .println("\t-filter\t\tfilter out URL-s prohibited by current 
URLFilters");
+      System.err
+          .println("\t-normalize\t\tnormalize URL via current URLNormalizers");
+      System.err
+          .println("\t-slice NNNN\tcreate many output segments, each 
containing NNNN URLs");
       return;
     }
     Configuration conf = NutchConfiguration.create();
@@ -688,7 +737,8 @@ public class SegmentMerger extends Confi
       return;
     }
     SegmentMerger merger = new SegmentMerger(conf);
-    merger.merge(out, segs.toArray(new Path[segs.size()]), filter, normalize, 
sliceSize);
+    merger.merge(out, segs.toArray(new Path[segs.size()]), filter, normalize,
+        sliceSize);
   }
 
 }

Modified: nutch/trunk/src/java/org/apache/nutch/segment/SegmentPart.java
URL: 
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/segment/SegmentPart.java?rev=1655526&r1=1655525&r2=1655526&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/segment/SegmentPart.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/segment/SegmentPart.java Thu Jan 29 
05:38:59 2015
@@ -30,16 +30,16 @@ public class SegmentPart {
   public String segmentName;
   /** Name of the segment part (ie. one of subdirectories inside a segment). */
   public String partName;
-  
+
   public SegmentPart() {
-    
+
   }
-  
+
   public SegmentPart(String segmentName, String partName) {
     this.segmentName = segmentName;
     this.partName = partName;
   }
-  
+
   /**
    * Return a String representation of this class, in the form
    * "segmentName/partName".
@@ -47,23 +47,27 @@ public class SegmentPart {
   public String toString() {
     return segmentName + "/" + partName;
   }
-  
+
   /**
    * Create SegmentPart from a FileSplit.
+   * 
    * @param split
-   * @return A {@link SegmentPart} resultant from a 
-   * {@link FileSplit}.
+   * @return A {@link SegmentPart} resultant from a {@link FileSplit}.
    * @throws Exception
    */
   public static SegmentPart get(FileSplit split) throws IOException {
     return get(split.getPath().toString());
   }
-  
+
   /**
    * Create SegmentPart from a full path of a location inside any segment part.
-   * @param path full path into a segment part (may include "part-xxxxx" 
components)
+   * 
+   * @param path
+   *          full path into a segment part (may include "part-xxxxx"
+   *          components)
    * @return SegmentPart instance describing this part.
-   * @throws IOException if any required path components are missing.
+   * @throws IOException
+   *           if any required path components are missing.
    */
   public static SegmentPart get(String path) throws IOException {
     // find part name
@@ -87,12 +91,15 @@ public class SegmentPart {
     String segment = dir.substring(idx + 1);
     return new SegmentPart(segment, part);
   }
-  
+
   /**
    * Create SegmentPart from a String in format "segmentName/partName".
-   * @param string input String
+   * 
+   * @param string
+   *          input String
    * @return parsed instance of SegmentPart
-   * @throws IOException if "/" is missing.
+   * @throws IOException
+   *           if "/" is missing.
    */
   public static SegmentPart parse(String string) throws IOException {
     int idx = string.indexOf('/');

Modified: nutch/trunk/src/java/org/apache/nutch/segment/SegmentReader.java
URL: 
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/segment/SegmentReader.java?rev=1655526&r1=1655525&r2=1655526&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/segment/SegmentReader.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/segment/SegmentReader.java Thu Jan 29 
05:38:59 2015
@@ -75,7 +75,7 @@ public class SegmentReader extends Confi
   public static final Logger LOG = 
LoggerFactory.getLogger(SegmentReader.class);
 
   long recNo = 0L;
-  
+
   private boolean co, fe, ge, pa, pd, pt;
   private FileSystem fs;
 
@@ -84,33 +84,38 @@ public class SegmentReader extends Confi
     private Text newKey = new Text();
 
     public void map(WritableComparable<?> key, Writable value,
-        OutputCollector<Text, NutchWritable> collector, Reporter reporter) 
throws IOException {
+        OutputCollector<Text, NutchWritable> collector, Reporter reporter)
+        throws IOException {
       // convert on the fly from old formats with UTF8 keys.
       // UTF8 deprecated and replaced by Text.
       if (key instanceof Text) {
         newKey.set(key.toString());
         key = newKey;
       }
-      collector.collect((Text)key, new NutchWritable(value));
+      collector.collect((Text) key, new NutchWritable(value));
     }
-    
+
   }
 
   /** Implements a text output format */
   public static class TextOutputFormat extends
       FileOutputFormat<WritableComparable<?>, Writable> {
     public RecordWriter<WritableComparable<?>, Writable> getRecordWriter(
-        final FileSystem fs, JobConf job,
-        String name, final Progressable progress) throws IOException {
+        final FileSystem fs, JobConf job, String name,
+        final Progressable progress) throws IOException {
 
-      final Path segmentDumpFile = new 
Path(FileOutputFormat.getOutputPath(job), name);
+      final Path segmentDumpFile = new Path(
+          FileOutputFormat.getOutputPath(job), name);
 
       // Get the old copy out of the way
-      if (fs.exists(segmentDumpFile)) fs.delete(segmentDumpFile, true);
+      if (fs.exists(segmentDumpFile))
+        fs.delete(segmentDumpFile, true);
 
-      final PrintStream printStream = new 
PrintStream(fs.create(segmentDumpFile));
+      final PrintStream printStream = new PrintStream(
+          fs.create(segmentDumpFile));
       return new RecordWriter<WritableComparable<?>, Writable>() {
-        public synchronized void write(WritableComparable<?> key, Writable 
value) throws IOException {
+        public synchronized void write(WritableComparable<?> key, Writable 
value)
+            throws IOException {
           printStream.println(value);
         }
 
@@ -124,9 +129,9 @@ public class SegmentReader extends Confi
   public SegmentReader() {
     super(null);
   }
-  
-  public SegmentReader(Configuration conf, boolean co, boolean fe, boolean ge, 
boolean pa,
-          boolean pd, boolean pt) {
+
+  public SegmentReader(Configuration conf, boolean co, boolean fe, boolean ge,
+      boolean pa, boolean pd, boolean pt) {
     super(conf);
     this.co = co;
     this.fe = fe;
@@ -166,12 +171,12 @@ public class SegmentReader extends Confi
     job.setBoolean("segment.reader.pt", this.pt);
     return job;
   }
-  
-  public void close() {}
+
+  public void close() {
+  }
 
   public void reduce(Text key, Iterator<NutchWritable> values,
-      OutputCollector<Text, Text> output, Reporter reporter)
-          throws IOException {
+      OutputCollector<Text, Text> output, Reporter reporter) throws 
IOException {
     StringBuffer dump = new StringBuffer();
 
     dump.append("\nRecno:: ").append(recNo++).append("\n");
@@ -194,7 +199,7 @@ public class SegmentReader extends Confi
   }
 
   public void dump(Path segment, Path output) throws IOException {
-    
+
     if (LOG.isInfoEnabled()) {
       LOG.info("SegmentReader: dump segment: " + segment);
     }
@@ -202,20 +207,30 @@ public class SegmentReader extends Confi
     JobConf job = createJobConf();
     job.setJobName("read " + segment);
 
-    if (ge) FileInputFormat.addInputPath(job, new Path(segment, 
CrawlDatum.GENERATE_DIR_NAME));
-    if (fe) FileInputFormat.addInputPath(job, new Path(segment, 
CrawlDatum.FETCH_DIR_NAME));
-    if (pa) FileInputFormat.addInputPath(job, new Path(segment, 
CrawlDatum.PARSE_DIR_NAME));
-    if (co) FileInputFormat.addInputPath(job, new Path(segment, 
Content.DIR_NAME));
-    if (pd) FileInputFormat.addInputPath(job, new Path(segment, 
ParseData.DIR_NAME));
-    if (pt) FileInputFormat.addInputPath(job, new Path(segment, 
ParseText.DIR_NAME));
+    if (ge)
+      FileInputFormat.addInputPath(job, new Path(segment,
+          CrawlDatum.GENERATE_DIR_NAME));
+    if (fe)
+      FileInputFormat.addInputPath(job, new Path(segment,
+          CrawlDatum.FETCH_DIR_NAME));
+    if (pa)
+      FileInputFormat.addInputPath(job, new Path(segment,
+          CrawlDatum.PARSE_DIR_NAME));
+    if (co)
+      FileInputFormat.addInputPath(job, new Path(segment, Content.DIR_NAME));
+    if (pd)
+      FileInputFormat.addInputPath(job, new Path(segment, ParseData.DIR_NAME));
+    if (pt)
+      FileInputFormat.addInputPath(job, new Path(segment, ParseText.DIR_NAME));
 
     job.setInputFormat(SequenceFileInputFormat.class);
     job.setMapperClass(InputCompatMapper.class);
     job.setReducerClass(SegmentReader.class);
 
-    Path tempDir = new Path(job.get("hadoop.tmp.dir", "/tmp") + "/segread-" + 
new java.util.Random().nextInt());
+    Path tempDir = new Path(job.get("hadoop.tmp.dir", "/tmp") + "/segread-"
+        + new java.util.Random().nextInt());
     fs.delete(tempDir, true);
-    
+
     FileOutputFormat.setOutputPath(job, tempDir);
     job.setOutputFormat(TextOutputFormat.class);
     job.setOutputKeyClass(Text.class);
@@ -228,22 +243,25 @@ public class SegmentReader extends Confi
 
     // remove the old file
     fs.delete(dumpFile, true);
-    FileStatus[] fstats = fs.listStatus(tempDir, 
HadoopFSUtil.getPassAllFilter());
+    FileStatus[] fstats = fs.listStatus(tempDir,
+        HadoopFSUtil.getPassAllFilter());
     Path[] files = HadoopFSUtil.getPaths(fstats);
 
     PrintWriter writer = null;
     int currentRecordNumber = 0;
     if (files.length > 0) {
-      writer = new PrintWriter(new BufferedWriter(new 
OutputStreamWriter(fs.create(dumpFile))));
+      writer = new PrintWriter(new BufferedWriter(new OutputStreamWriter(
+          fs.create(dumpFile))));
       try {
         for (int i = 0; i < files.length; i++) {
           Path partFile = files[i];
           try {
-            currentRecordNumber = append(fs, job, partFile, writer, 
currentRecordNumber);
+            currentRecordNumber = append(fs, job, partFile, writer,
+                currentRecordNumber);
           } catch (IOException exception) {
             if (LOG.isWarnEnabled()) {
-              LOG.warn("Couldn't copy the content of " + partFile.toString() +
-                       " into " + dumpFile.toString());
+              LOG.warn("Couldn't copy the content of " + partFile.toString()
+                  + " into " + dumpFile.toString());
               LOG.warn(exception.getMessage());
             }
           }
@@ -253,13 +271,16 @@ public class SegmentReader extends Confi
       }
     }
     fs.delete(tempDir, true);
-    if (LOG.isInfoEnabled()) { LOG.info("SegmentReader: done"); }
+    if (LOG.isInfoEnabled()) {
+      LOG.info("SegmentReader: done");
+    }
   }
 
   /** Appends two files and updates the Recno counter */
-  private int append(FileSystem fs, Configuration conf, Path src, PrintWriter 
writer, int currentRecordNumber)
-          throws IOException {
-    BufferedReader reader = new BufferedReader(new 
InputStreamReader(fs.open(src)));
+  private int append(FileSystem fs, Configuration conf, Path src,
+      PrintWriter writer, int currentRecordNumber) throws IOException {
+    BufferedReader reader = new BufferedReader(new InputStreamReader(
+        fs.open(src)));
     try {
       String line = reader.readLine();
       while (line != null) {
@@ -276,89 +297,101 @@ public class SegmentReader extends Confi
   }
 
   private static final String[][] keys = new String[][] {
-          {"co", "Content::\n"},
-          {"ge", "Crawl Generate::\n"},
-          {"fe", "Crawl Fetch::\n"},
-          {"pa", "Crawl Parse::\n"},
-          {"pd", "ParseData::\n"},
-          {"pt", "ParseText::\n"}
-  };
+      { "co", "Content::\n" }, { "ge", "Crawl Generate::\n" },
+      { "fe", "Crawl Fetch::\n" }, { "pa", "Crawl Parse::\n" },
+      { "pd", "ParseData::\n" }, { "pt", "ParseText::\n" } };
 
   public void get(final Path segment, final Text key, Writer writer,
-          final Map<String, List<Writable>> results) throws Exception {
+      final Map<String, List<Writable>> results) throws Exception {
     LOG.info("SegmentReader: get '" + key + "'");
     ArrayList<Thread> threads = new ArrayList<Thread>();
-    if (co) threads.add(new Thread() {
-      public void run() {
-        try {
-          List<Writable> res = getMapRecords(new Path(segment, 
Content.DIR_NAME), key);
-          results.put("co", res);
-        } catch (Exception e) {
-          LOG.error("Exception:", e);
-        }
-      }
-    });
-    if (fe) threads.add(new Thread() {
-      public void run() {
-        try {
-          List<Writable> res = getMapRecords(new Path(segment, 
CrawlDatum.FETCH_DIR_NAME), key);
-          results.put("fe", res);
-        } catch (Exception e) {
-          LOG.error("Exception:", e);
-        }
-      }
-    });
-    if (ge) threads.add(new Thread() {
-      public void run() {
-        try {
-          List<Writable> res = getSeqRecords(new Path(segment, 
CrawlDatum.GENERATE_DIR_NAME), key);
-          results.put("ge", res);
-        } catch (Exception e) {
-          LOG.error("Exception:", e);
-        }
-      }
-    });
-    if (pa) threads.add(new Thread() {
-      public void run() {
-        try {
-          List<Writable> res = getSeqRecords(new Path(segment, 
CrawlDatum.PARSE_DIR_NAME), key);
-          results.put("pa", res);
-        } catch (Exception e) {
-          LOG.error("Exception:", e);
-        }
-      }
-    });
-    if (pd) threads.add(new Thread() {
-      public void run() {
-        try {
-          List<Writable> res = getMapRecords(new Path(segment, 
ParseData.DIR_NAME), key);
-          results.put("pd", res);
-        } catch (Exception e) {
-          LOG.error("Exception:", e);
-        }
-      }
-    });
-    if (pt) threads.add(new Thread() {
-      public void run() {
-        try {
-          List<Writable> res = getMapRecords(new Path(segment, 
ParseText.DIR_NAME), key);
-          results.put("pt", res);
-        } catch (Exception e) {
-          LOG.error("Exception:", e);
+    if (co)
+      threads.add(new Thread() {
+        public void run() {
+          try {
+            List<Writable> res = getMapRecords(new Path(segment,
+                Content.DIR_NAME), key);
+            results.put("co", res);
+          } catch (Exception e) {
+            LOG.error("Exception:", e);
+          }
         }
-      }
-    });
+      });
+    if (fe)
+      threads.add(new Thread() {
+        public void run() {
+          try {
+            List<Writable> res = getMapRecords(new Path(segment,
+                CrawlDatum.FETCH_DIR_NAME), key);
+            results.put("fe", res);
+          } catch (Exception e) {
+            LOG.error("Exception:", e);
+          }
+        }
+      });
+    if (ge)
+      threads.add(new Thread() {
+        public void run() {
+          try {
+            List<Writable> res = getSeqRecords(new Path(segment,
+                CrawlDatum.GENERATE_DIR_NAME), key);
+            results.put("ge", res);
+          } catch (Exception e) {
+            LOG.error("Exception:", e);
+          }
+        }
+      });
+    if (pa)
+      threads.add(new Thread() {
+        public void run() {
+          try {
+            List<Writable> res = getSeqRecords(new Path(segment,
+                CrawlDatum.PARSE_DIR_NAME), key);
+            results.put("pa", res);
+          } catch (Exception e) {
+            LOG.error("Exception:", e);
+          }
+        }
+      });
+    if (pd)
+      threads.add(new Thread() {
+        public void run() {
+          try {
+            List<Writable> res = getMapRecords(new Path(segment,
+                ParseData.DIR_NAME), key);
+            results.put("pd", res);
+          } catch (Exception e) {
+            LOG.error("Exception:", e);
+          }
+        }
+      });
+    if (pt)
+      threads.add(new Thread() {
+        public void run() {
+          try {
+            List<Writable> res = getMapRecords(new Path(segment,
+                ParseText.DIR_NAME), key);
+            results.put("pt", res);
+          } catch (Exception e) {
+            LOG.error("Exception:", e);
+          }
+        }
+      });
     Iterator<Thread> it = threads.iterator();
-    while (it.hasNext()) it.next().start();
+    while (it.hasNext())
+      it.next().start();
     int cnt;
     do {
       cnt = 0;
       try {
         Thread.sleep(5000);
-      } catch (Exception e) {};
+      } catch (Exception e) {
+      }
+      ;
       it = threads.iterator();
       while (it.hasNext()) {
-        if (it.next().isAlive()) cnt++;
+        if (it.next().isAlive())
+          cnt++;
       }
       if ((cnt > 0) && (LOG.isDebugEnabled())) {
         LOG.debug("(" + cnt + " to retrieve)");
@@ -375,24 +408,25 @@ public class SegmentReader extends Confi
       writer.flush();
     }
   }
-  
+
   private List<Writable> getMapRecords(Path dir, Text key) throws Exception {
-    MapFile.Reader[] readers = MapFileOutputFormat.getReaders(fs, dir, 
getConf());
+    MapFile.Reader[] readers = MapFileOutputFormat.getReaders(fs, dir,
+        getConf());
     ArrayList<Writable> res = new ArrayList<Writable>();
     Class<?> keyClass = readers[0].getKeyClass();
     Class<?> valueClass = readers[0].getValueClass();
     if (!keyClass.getName().equals("org.apache.hadoop.io.Text"))
       throw new IOException("Incompatible key (" + keyClass.getName() + ")");
-    Writable value = (Writable)valueClass.newInstance();
+    Writable value = (Writable) valueClass.newInstance();
     // we don't know the partitioning schema
     for (int i = 0; i < readers.length; i++) {
       if (readers[i].get(key, value) != null) {
         res.add(value);
-        value = (Writable)valueClass.newInstance();
+        value = (Writable) valueClass.newInstance();
         Text aKey = (Text) keyClass.newInstance();
         while (readers[i].next(aKey, value) && aKey.equals(key)) {
           res.add(value);
-          value = (Writable)valueClass.newInstance();
+          value = (Writable) valueClass.newInstance();
         }
       }
       readers[i].close();
@@ -401,19 +435,20 @@ public class SegmentReader extends Confi
   }
 
   private List<Writable> getSeqRecords(Path dir, Text key) throws Exception {
-    SequenceFile.Reader[] readers = 
SequenceFileOutputFormat.getReaders(getConf(), dir);
+    SequenceFile.Reader[] readers = SequenceFileOutputFormat.getReaders(
+        getConf(), dir);
     ArrayList<Writable> res = new ArrayList<Writable>();
     Class<?> keyClass = readers[0].getKeyClass();
     Class<?> valueClass = readers[0].getValueClass();
     if (!keyClass.getName().equals("org.apache.hadoop.io.Text"))
       throw new IOException("Incompatible key (" + keyClass.getName() + ")");
-    Writable aKey = (Writable)keyClass.newInstance();
-    Writable value = (Writable)valueClass.newInstance();
+    Writable aKey = (Writable) keyClass.newInstance();
+    Writable value = (Writable) valueClass.newInstance();
     for (int i = 0; i < readers.length; i++) {
       while (readers[i].next(aKey, value)) {
         if (aKey.equals(key)) {
           res.add(value);
-          value = (Writable)valueClass.newInstance();
+          value = (Writable) valueClass.newInstance();
         }
       }
       readers[i].close();
@@ -430,41 +465,55 @@ public class SegmentReader extends Confi
     public long parsed = -1L;
     public long parseErrors = -1L;
   }
-  
+
   SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");
-  
+
   public void list(List<Path> dirs, Writer writer) throws Exception {
-    writer.write("NAME\t\tGENERATED\tFETCHER START\t\tFETCHER 
END\t\tFETCHED\tPARSED\n");
+    writer
+        .write("NAME\t\tGENERATED\tFETCHER START\t\tFETCHER 
END\t\tFETCHED\tPARSED\n");
     for (int i = 0; i < dirs.size(); i++) {
       Path dir = dirs.get(i);
       SegmentReaderStats stats = new SegmentReaderStats();
       getStats(dir, stats);
       writer.write(dir.getName() + "\t");
-      if (stats.generated == -1) writer.write("?");
-      else writer.write(stats.generated + "");
+      if (stats.generated == -1)
+        writer.write("?");
+      else
+        writer.write(stats.generated + "");
       writer.write("\t\t");
-      if (stats.start == -1) writer.write("?\t");
-      else writer.write(sdf.format(new Date(stats.start)));
+      if (stats.start == -1)
+        writer.write("?\t");
+      else
+        writer.write(sdf.format(new Date(stats.start)));
       writer.write("\t");
-      if (stats.end == -1) writer.write("?");
-      else writer.write(sdf.format(new Date(stats.end)));
+      if (stats.end == -1)
+        writer.write("?");
+      else
+        writer.write(sdf.format(new Date(stats.end)));
       writer.write("\t");
-      if (stats.fetched == -1) writer.write("?");
-      else writer.write(stats.fetched + "");
+      if (stats.fetched == -1)
+        writer.write("?");
+      else
+        writer.write(stats.fetched + "");
       writer.write("\t");
-      if (stats.parsed == -1) writer.write("?");
-      else writer.write(stats.parsed + "");
+      if (stats.parsed == -1)
+        writer.write("?");
+      else
+        writer.write(stats.parsed + "");
       writer.write("\n");
       writer.flush();
     }
   }
-  
-  public void getStats(Path segment, final SegmentReaderStats stats) throws 
Exception {
-    SequenceFile.Reader[] readers = 
SequenceFileOutputFormat.getReaders(getConf(), new Path(segment, 
CrawlDatum.GENERATE_DIR_NAME));
+
+  public void getStats(Path segment, final SegmentReaderStats stats)
+      throws Exception {
+    SequenceFile.Reader[] readers = SequenceFileOutputFormat.getReaders(
+        getConf(), new Path(segment, CrawlDatum.GENERATE_DIR_NAME));
     long cnt = 0L;
     Text key = new Text();
     for (int i = 0; i < readers.length; i++) {
-      while (readers[i].next(key)) cnt++;
+      while (readers[i].next(key))
+        cnt++;
       readers[i].close();
     }
     stats.generated = cnt;
@@ -474,12 +523,15 @@ public class SegmentReader extends Confi
       long start = Long.MAX_VALUE;
       long end = Long.MIN_VALUE;
       CrawlDatum value = new CrawlDatum();
-      MapFile.Reader[] mreaders = MapFileOutputFormat.getReaders(fs, fetchDir, 
getConf());
+      MapFile.Reader[] mreaders = MapFileOutputFormat.getReaders(fs, fetchDir,
+          getConf());
       for (int i = 0; i < mreaders.length; i++) {
         while (mreaders[i].next(key, value)) {
           cnt++;
-          if (value.getFetchTime() < start) start = value.getFetchTime();
-          if (value.getFetchTime() > end) end = value.getFetchTime();
+          if (value.getFetchTime() < start)
+            start = value.getFetchTime();
+          if (value.getFetchTime() > end)
+            end = value.getFetchTime();
         }
         mreaders[i].close();
       }
@@ -492,11 +544,13 @@ public class SegmentReader extends Confi
       cnt = 0L;
       long errors = 0L;
       ParseData value = new ParseData();
-      MapFile.Reader[] mreaders = MapFileOutputFormat.getReaders(fs, parseDir, 
getConf());
+      MapFile.Reader[] mreaders = MapFileOutputFormat.getReaders(fs, parseDir,
+          getConf());
       for (int i = 0; i < mreaders.length; i++) {
         while (mreaders[i].next(key, value)) {
           cnt++;
-          if (!value.getStatus().isSuccess()) errors++;
+          if (!value.getStatus().isSuccess())
+            errors++;
         }
         mreaders[i].close();
       }
@@ -504,7 +558,7 @@ public class SegmentReader extends Confi
       stats.parseErrors = errors;
     }
   }
-  
+
   private static final int MODE_DUMP = 0;
 
   private static final int MODE_LIST = 1;
@@ -521,7 +575,8 @@ public class SegmentReader extends Confi
       mode = MODE_DUMP;
     else if (args[0].equals("-list"))
       mode = MODE_LIST;
-    else if (args[0].equals("-get")) mode = MODE_GET;
+    else if (args[0].equals("-get"))
+      mode = MODE_GET;
 
     boolean co = true;
     boolean fe = true;
@@ -553,63 +608,69 @@ public class SegmentReader extends Confi
     }
     Configuration conf = NutchConfiguration.create();
     final FileSystem fs = FileSystem.get(conf);
-    SegmentReader segmentReader = new SegmentReader(conf, co, fe, ge, pa, pd, 
pt);
+    SegmentReader segmentReader = new SegmentReader(conf, co, fe, ge, pa, pd,
+        pt);
     // collect required args
     switch (mode) {
-      case MODE_DUMP:
-        String input = args[1];
-        if (input == null) {
-          System.err.println("Missing required argument: <segment_dir>");
-          usage();
-          return;
-        }
-        String output = args.length > 2 ? args[2] : null;
-        if (output == null) {
-          System.err.println("Missing required argument: <output>");
-          usage();
-          return;
-        }
-        segmentReader.dump(new Path(input), new Path(output));
+    case MODE_DUMP:
+      String input = args[1];
+      if (input == null) {
+        System.err.println("Missing required argument: <segment_dir>");
+        usage();
         return;
-      case MODE_LIST:
-        ArrayList<Path> dirs = new ArrayList<Path>();
-        for (int i = 1; i < args.length; i++) {
-          if (args[i] == null) continue;
-          if (args[i].equals("-dir")) {
-            Path dir = new Path(args[++i]);
-            FileStatus[] fstats = fs.listStatus(dir, 
HadoopFSUtil.getPassDirectoriesFilter(fs));
-            Path[] files = HadoopFSUtil.getPaths(fstats);
-            if (files != null && files.length > 0) {
-              dirs.addAll(Arrays.asList(files));
-            }
-          } else dirs.add(new Path(args[i]));
-        }
-        segmentReader.list(dirs, new OutputStreamWriter(System.out, "UTF-8"));
+      }
+      String output = args.length > 2 ? args[2] : null;
+      if (output == null) {
+        System.err.println("Missing required argument: <output>");
+        usage();
         return;
-      case MODE_GET:
-        input = args[1];
-        if (input == null) {
-          System.err.println("Missing required argument: <segment_dir>");
-          usage();
-          return;
-        }
-        String key = args.length > 2 ? args[2] : null;
-        if (key == null) {
-          System.err.println("Missing required argument: <keyValue>");
-          usage();
-          return;
-        }
-        segmentReader.get(new Path(input), new Text(key), new 
OutputStreamWriter(System.out, "UTF-8"), new HashMap<String, List<Writable>>());
+      }
+      segmentReader.dump(new Path(input), new Path(output));
+      return;
+    case MODE_LIST:
+      ArrayList<Path> dirs = new ArrayList<Path>();
+      for (int i = 1; i < args.length; i++) {
+        if (args[i] == null)
+          continue;
+        if (args[i].equals("-dir")) {
+          Path dir = new Path(args[++i]);
+          FileStatus[] fstats = fs.listStatus(dir,
+              HadoopFSUtil.getPassDirectoriesFilter(fs));
+          Path[] files = HadoopFSUtil.getPaths(fstats);
+          if (files != null && files.length > 0) {
+            dirs.addAll(Arrays.asList(files));
+          }
+        } else
+          dirs.add(new Path(args[i]));
+      }
+      segmentReader.list(dirs, new OutputStreamWriter(System.out, "UTF-8"));
+      return;
+    case MODE_GET:
+      input = args[1];
+      if (input == null) {
+        System.err.println("Missing required argument: <segment_dir>");
+        usage();
         return;
-      default:
-        System.err.println("Invalid operation: " + args[0]);
+      }
+      String key = args.length > 2 ? args[2] : null;
+      if (key == null) {
+        System.err.println("Missing required argument: <keyValue>");
         usage();
         return;
+      }
+      segmentReader.get(new Path(input), new Text(key), new OutputStreamWriter(
+          System.out, "UTF-8"), new HashMap<String, List<Writable>>());
+      return;
+    default:
+      System.err.println("Invalid operation: " + args[0]);
+      usage();
+      return;
     }
   }
 
   private static void usage() {
-    System.err.println("Usage: SegmentReader (-dump ... | -list ... | -get 
...) [general options]\n");
+    System.err
+        .println("Usage: SegmentReader (-dump ... | -list ... | -get ...) 
[general options]\n");
     System.err.println("* General options:");
     System.err.println("\t-nocontent\tignore content directory");
     System.err.println("\t-nofetch\tignore crawl_fetch directory");
@@ -618,21 +679,32 @@ public class SegmentReader extends Confi
     System.err.println("\t-noparsedata\tignore parse_data directory");
     System.err.println("\t-noparsetext\tignore parse_text directory");
     System.err.println();
-    System.err.println("* SegmentReader -dump <segment_dir> <output> [general 
options]");
-    System.err.println("  Dumps content of a <segment_dir> as a text file to 
<output>.\n");
+    System.err
+        .println("* SegmentReader -dump <segment_dir> <output> [general 
options]");
+    System.err
+        .println("  Dumps content of a <segment_dir> as a text file to 
<output>.\n");
     System.err.println("\t<segment_dir>\tname of the segment directory.");
-    System.err.println("\t<output>\tname of the (non-existent) output 
directory.");
+    System.err
+        .println("\t<output>\tname of the (non-existent) output directory.");
     System.err.println();
-    System.err.println("* SegmentReader -list (<segment_dir1> ... | -dir 
<segments>) [general options]");
-    System.err.println("  List a synopsis of segments in specified 
directories, or all segments in");
-    System.err.println("  a directory <segments>, and print it on 
System.out\n");
-    System.err.println("\t<segment_dir1> ...\tlist of segment directories to 
process");
-    System.err.println("\t-dir <segments>\t\tdirectory that contains multiple 
segments");
+    System.err
+        .println("* SegmentReader -list (<segment_dir1> ... | -dir <segments>) 
[general options]");
+    System.err
+        .println("  List a synopsis of segments in specified directories, or 
all segments in");
+    System.err
+        .println("  a directory <segments>, and print it on System.out\n");
+    System.err
+        .println("\t<segment_dir1> ...\tlist of segment directories to 
process");
+    System.err
+        .println("\t-dir <segments>\t\tdirectory that contains multiple 
segments");
     System.err.println();
-    System.err.println("* SegmentReader -get <segment_dir> <keyValue> [general 
options]");
-    System.err.println("  Get a specified record from a segment, and print it 
on System.out.\n");
+    System.err
+        .println("* SegmentReader -get <segment_dir> <keyValue> [general 
options]");
+    System.err
+        .println("  Get a specified record from a segment, and print it on 
System.out.\n");
     System.err.println("\t<segment_dir>\tname of the segment directory.");
     System.err.println("\t<keyValue>\tvalue of the key (url).");
-    System.err.println("\t\tNote: put double-quotes around strings with 
spaces.");
+    System.err
+        .println("\t\tNote: put double-quotes around strings with spaces.");
   }
 }

Modified: nutch/trunk/src/java/org/apache/nutch/segment/package-info.java
URL: 
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/segment/package-info.java?rev=1655526&r1=1655525&r2=1655526&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/segment/package-info.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/segment/package-info.java Thu Jan 29 
05:38:59 2015
@@ -20,3 +20,4 @@
  * fetch list, protocol status, raw content, parsed content, and extracted 
outgoing links.
  */
 package org.apache.nutch.segment;
+

Modified: nutch/trunk/src/java/org/apache/nutch/tools/Benchmark.java
URL: 
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/tools/Benchmark.java?rev=1655526&r1=1655525&r2=1655526&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/tools/Benchmark.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/tools/Benchmark.java Thu Jan 29 
05:38:59 2015
@@ -52,13 +52,14 @@ public class Benchmark extends Configure
     int res = ToolRunner.run(conf, new Benchmark(), args);
     System.exit(res);
   }
-  
+
   private static String getDate() {
-    return new SimpleDateFormat("yyyyMMddHHmmss").format
-      (new Date(System.currentTimeMillis()));
+    return new SimpleDateFormat("yyyyMMddHHmmss").format(new Date(System
+        .currentTimeMillis()));
   }
- 
-  private void createSeeds(FileSystem fs, Path seedsDir, int count) throws 
Exception {
+
+  private void createSeeds(FileSystem fs, Path seedsDir, int count)
+      throws Exception {
     OutputStream os = fs.create(new Path(seedsDir, "seeds"));
     for (int i = 0; i < count; i++) {
       String url = "http://www.test-"; + i + ".com/\r\n";
@@ -67,9 +68,9 @@ public class Benchmark extends Configure
     os.flush();
     os.close();
   }
-  
+
   public static final class BenchmarkResults {
-    Map<String,Map<String,Long>> timings = new 
HashMap<String,Map<String,Long>>();
+    Map<String, Map<String, Long>> timings = new HashMap<String, Map<String, 
Long>>();
     List<String> runs = new ArrayList<String>();
     List<String> stages = new ArrayList<String>();
     int seeds, depth, threads;
@@ -77,7 +78,7 @@ public class Benchmark extends Configure
     long topN;
     long elapsed;
     String plugins;
-    
+
     public void addTiming(String stage, String run, long timing) {
       if (!runs.contains(run)) {
         runs.add(run);
@@ -85,14 +86,14 @@ public class Benchmark extends Configure
       if (!stages.contains(stage)) {
         stages.add(stage);
       }
-      Map<String,Long> t = timings.get(stage);
+      Map<String, Long> t = timings.get(stage);
       if (t == null) {
-        t = new HashMap<String,Long>();
+        t = new HashMap<String, Long>();
         timings.put(stage, t);
       }
       t.put(run, timing);
     }
-    
+
     public String toString() {
       StringBuilder sb = new StringBuilder();
       sb.append("* Plugins:\t" + plugins + "\n");
@@ -103,8 +104,9 @@ public class Benchmark extends Configure
       sb.append("* Delete:\t" + delete + "\n");
       sb.append("* TOTAL ELAPSED:\t" + elapsed + "\n");
       for (String stage : stages) {
-        Map<String,Long> timing = timings.get(stage);
-        if (timing == null) continue;
+        Map<String, Long> timing = timings.get(stage);
+        if (timing == null)
+          continue;
         sb.append("- stage: " + stage + "\n");
         for (String r : runs) {
           Long Time = timing.get(r);
@@ -116,15 +118,16 @@ public class Benchmark extends Configure
       }
       return sb.toString();
     }
-    
+
     public List<String> getStages() {
       return stages;
     }
+
     public List<String> getRuns() {
       return runs;
     }
   }
-  
+
   public int run(String[] args) throws Exception {
     String plugins = 
"protocol-http|parse-tika|scoring-opic|urlfilter-regex|urlnormalizer-pass";
     int seeds = 1;
@@ -132,17 +135,24 @@ public class Benchmark extends Configure
     int threads = 10;
     boolean delete = true;
     long topN = Long.MAX_VALUE;
-    
+
     if (args.length == 0) {
-      System.err.println("Usage: Benchmark [-seeds NN] [-depth NN] [-threads 
NN] [-keep] [-maxPerHost NN] [-plugins <regex>]");
-      System.err.println("\t-seeds NN\tcreate NN unique hosts in a seed list 
(default: 1)");
+      System.err
+          .println("Usage: Benchmark [-seeds NN] [-depth NN] [-threads NN] 
[-keep] [-maxPerHost NN] [-plugins <regex>]");
+      System.err
+          .println("\t-seeds NN\tcreate NN unique hosts in a seed list 
(default: 1)");
       System.err.println("\t-depth NN\tperform NN crawl cycles (default: 10)");
-      System.err.println("\t-threads NN\tuse NN threads per Fetcher task 
(default: 10)");
-      System.err.println("\t-keep\tkeep segment data (default: delete after 
updatedb)");
+      System.err
+          .println("\t-threads NN\tuse NN threads per Fetcher task (default: 
10)");
+      System.err
+          .println("\t-keep\tkeep segment data (default: delete after 
updatedb)");
       System.err.println("\t-plugins <regex>\toverride 'plugin.includes'.");
-      System.err.println("\tNOTE: if not specified, this is reset to: " + 
plugins);
-      System.err.println("\tNOTE: if 'default' is specified then a value set 
in nutch-default/nutch-site is used.");
-      System.err.println("\t-maxPerHost NN\tmax. # of URLs per host in a 
fetchlist");
+      System.err.println("\tNOTE: if not specified, this is reset to: "
+          + plugins);
+      System.err
+          .println("\tNOTE: if 'default' is specified then a value set in 
nutch-default/nutch-site is used.");
+      System.err
+          .println("\t-maxPerHost NN\tmax. # of URLs per host in a fetchlist");
       return -1;
     }
     int maxPerHost = Integer.MAX_VALUE;
@@ -164,13 +174,15 @@ public class Benchmark extends Configure
         return -1;
       }
     }
-    BenchmarkResults res = benchmark(seeds, depth, threads, maxPerHost, topN, 
delete, plugins);
+    BenchmarkResults res = benchmark(seeds, depth, threads, maxPerHost, topN,
+        delete, plugins);
     System.out.println(res);
     return 0;
   }
-  
-  public BenchmarkResults benchmark(int seeds, int depth, int threads, int 
maxPerHost,
-        long topN, boolean delete, String plugins) throws Exception {
+
+  public BenchmarkResults benchmark(int seeds, int depth, int threads,
+      int maxPerHost, long topN, boolean delete, String plugins)
+      throws Exception {
     Configuration conf = getConf();
     conf.set("http.proxy.host", "localhost");
     conf.setInt("http.proxy.port", 8181);
@@ -180,11 +192,12 @@ public class Benchmark extends Configure
       conf.set("plugin.includes", plugins);
     }
     conf.setInt(Generator.GENERATOR_MAX_COUNT, maxPerHost);
-    conf.set(Generator.GENERATOR_COUNT_MODE, 
Generator.GENERATOR_COUNT_VALUE_HOST);
-    JobConf job = new NutchJob(getConf());    
+    conf.set(Generator.GENERATOR_COUNT_MODE,
+        Generator.GENERATOR_COUNT_VALUE_HOST);
+    JobConf job = new NutchJob(getConf());
     FileSystem fs = FileSystem.get(job);
-    Path dir = new Path(getConf().get("hadoop.tmp.dir"),
-            "bench-" + System.currentTimeMillis());
+    Path dir = new Path(getConf().get("hadoop.tmp.dir"), "bench-"
+        + System.currentTimeMillis());
     fs.mkdirs(dir);
     Path rootUrlDir = new Path(dir, "seed");
     fs.mkdirs(rootUrlDir);
@@ -194,7 +207,7 @@ public class Benchmark extends Configure
       LOG.info("crawl started in: " + dir);
       LOG.info("rootUrlDir = " + rootUrlDir);
       LOG.info("threads = " + threads);
-      LOG.info("depth = " + depth);      
+      LOG.info("depth = " + depth);
     }
     BenchmarkResults res = new BenchmarkResults();
     res.delete = delete;
@@ -213,17 +226,17 @@ public class Benchmark extends Configure
     ParseSegment parseSegment = new ParseSegment(getConf());
     CrawlDb crawlDbTool = new CrawlDb(getConf());
     LinkDb linkDbTool = new LinkDb(getConf());
-      
+
     // initialize crawlDb
     long start = System.currentTimeMillis();
     injector.inject(crawlDb, rootUrlDir);
     long delta = System.currentTimeMillis() - start;
     res.addTiming("inject", "0", delta);
     int i;
-    for (i = 0; i < depth; i++) {             // generate new segment
+    for (i = 0; i < depth; i++) { // generate new segment
       start = System.currentTimeMillis();
-      Path[] segs = generator.generate(crawlDb, segments, -1, topN, System
-          .currentTimeMillis());
+      Path[] segs = generator.generate(crawlDb, segments, -1, topN,
+          System.currentTimeMillis());
       delta = System.currentTimeMillis() - start;
       res.addTiming("generate", i + "", delta);
       if (segs == null) {
@@ -231,12 +244,12 @@ public class Benchmark extends Configure
         break;
       }
       start = System.currentTimeMillis();
-      fetcher.fetch(segs[0], threads);  // fetch it
+      fetcher.fetch(segs[0], threads); // fetch it
       delta = System.currentTimeMillis() - start;
       res.addTiming("fetch", i + "", delta);
       if (!Fetcher.isParsing(job)) {
         start = System.currentTimeMillis();
-        parseSegment.parse(segs[0]);    // parse it, if needed
+        parseSegment.parse(segs[0]); // parse it, if needed
         delta = System.currentTimeMillis() - start;
         res.addTiming("parse", i + "", delta);
       }
@@ -258,7 +271,9 @@ public class Benchmark extends Configure
     if (i == 0) {
       LOG.warn("No URLs to fetch - check your seed list and URL filters.");
     }
-    if (LOG.isInfoEnabled()) { LOG.info("crawl finished: " + dir); }
+    if (LOG.isInfoEnabled()) {
+      LOG.info("crawl finished: " + dir);
+    }
     res.elapsed = System.currentTimeMillis() - res.elapsed;
     CrawlDbReader dbreader = new CrawlDbReader();
     dbreader.processStatJob(crawlDb.toString(), conf, false);


Reply via email to