Author: markus
Date: Tue Dec 27 13:22:50 2011
New Revision: 1224905

URL: http://svn.apache.org/viewvc?rev=1224905&view=rev
Log:
Reverting Nutch-1125 CrawlDBScanner

Modified:
    nutch/trunk/CHANGES.txt
    nutch/trunk/src/java/org/apache/nutch/tools/CrawlDBScanner.java

Modified: nutch/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/nutch/trunk/CHANGES.txt?rev=1224905&r1=1224904&r2=1224905&view=diff
==============================================================================
--- nutch/trunk/CHANGES.txt (original)
+++ nutch/trunk/CHANGES.txt Tue Dec 27 13:22:50 2011
@@ -6,8 +6,6 @@ Nutch Change Log
 
 * NUTCH-1184 Fetcher to parse and follow Nth degree outlinks (markus)
 
-* NUTCH-1225 Migrate CrawlDBScanner to MapReduce API (markus)
-
 * NUTCH-1222 Upgrade to new Hadoop 0.22.0 (markus)
 
 * NUTCH-1221 Migrate DomainStatistics to MapReduce API (markus)

Modified: nutch/trunk/src/java/org/apache/nutch/tools/CrawlDBScanner.java
URL: 
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/tools/CrawlDBScanner.java?rev=1224905&r1=1224904&r2=1224905&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/tools/CrawlDBScanner.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/tools/CrawlDBScanner.java Tue Dec 27 
13:22:50 2011
@@ -26,20 +26,24 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapFileOutputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.nutch.crawl.CrawlDatum;
 import org.apache.nutch.crawl.CrawlDb;
 import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
 import org.apache.nutch.util.TimingUtil;
 
 /**
@@ -48,65 +52,71 @@ import org.apache.nutch.util.TimingUtil;
  * used as a new CrawlDB. The dump mechanism of the crawldb reader is not very
  * useful on large crawldbs as the ouput can be extremely large and the -url
  * function can't help if we don't know what url we want to have a look at.
- *
+ * 
  * @author : Julien Nioche
  */
-public class CrawlDBScanner extends Configured implements Tool {
+
+public class CrawlDBScanner extends Configured implements Tool,
+    Mapper<Text,CrawlDatum,Text,CrawlDatum>, 
Reducer<Text,CrawlDatum,Text,CrawlDatum> {
 
   public static final Logger LOG = 
LoggerFactory.getLogger(CrawlDBScanner.class);
 
+  public CrawlDBScanner() {}
 
-  static class CrawlDBScannerMapper extends 
Mapper<Text,CrawlDatum,Text,CrawlDatum> {
-    private String regex = null;
-    private String status = null;
-
-    public void setup(Context context) {
-      regex = context.getConfiguration().get("CrawlDBScanner.regex");
-      status = context.getConfiguration().get("CrawlDBScanner.status");
-    }
-
-    public void map(Text url, CrawlDatum crawlDatum, Context context) throws 
IOException, InterruptedException {
-      // check status
-      if (status != null
-          && 
!status.equalsIgnoreCase(CrawlDatum.getStatusName(crawlDatum.getStatus()))) 
return;
-
-      // if URL matched regexp dump it
-      if (url.toString().matches(regex)) {
-        context.write(url, crawlDatum);
-      }
+  public CrawlDBScanner(Configuration conf) {
+    setConf(conf);
+  }
+
+  public void close() {}
+
+  private String regex = null;
+  private String status = null;
+
+  public void configure(JobConf job) {
+    regex = job.get("CrawlDBScanner.regex");
+    status = job.get("CrawlDBScanner.status");
+  }
+
+  public void map(Text url, CrawlDatum crawlDatum,
+      OutputCollector<Text,CrawlDatum> output, Reporter reporter) throws 
IOException {
+
+    // check status
+    if (status != null
+        && 
!status.equalsIgnoreCase(CrawlDatum.getStatusName(crawlDatum.getStatus()))) 
return;
+
+    // if URL matched regexp dump it
+    if (url.toString().matches(regex)) {
+      output.collect(url, crawlDatum);
     }
   }
 
-  static class CrawlDBScannerReducer extends Reducer 
<Text,CrawlDatum,Text,CrawlDatum> {
-    public void reduce(Text key, Iterable<CrawlDatum> values, Context context) 
throws IOException, InterruptedException {
-      for (CrawlDatum val : values) {
-        context.write(key, val);
-      }
+  public void reduce(Text key, Iterator<CrawlDatum> values,
+      OutputCollector<Text,CrawlDatum> output, Reporter reporter) throws 
IOException {
+    while (values.hasNext()) {
+      CrawlDatum val = values.next();
+      output.collect(key, val);
     }
   }
 
   private void scan(Path crawlDb, Path outputPath, String regex, String status,
-      boolean text) throws Exception {
+      boolean text) throws IOException {
 
     SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
     long start = System.currentTimeMillis();
     LOG.info("CrawlDB scanner: starting at " + sdf.format(start));
 
+    JobConf job = new NutchJob(getConf());
 
-    Configuration conf = getConf();
-    conf.set("CrawlDBScanner.regex", regex);
-    if (status != null) conf.set("CrawlDBScanner.status", status);
-    if (text) conf.set("mapred.output.compress", "false");
-    conf.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
+    job.setJobName("Scan : " + crawlDb + " for URLS matching : " + regex);
 
-    Job job = new Job(conf, "Scan : " + crawlDb + " for URLS matching : " + 
regex);
-    job.setJarByClass(CrawlDBScanner.class);
+    job.set("CrawlDBScanner.regex", regex);
+    if (status != null) job.set("CrawlDBScanner.status", status);
 
     FileInputFormat.addInputPath(job, new Path(crawlDb, CrawlDb.CURRENT_NAME));
-    job.setInputFormatClass(SequenceFileInputFormat.class);
+    job.setInputFormat(SequenceFileInputFormat.class);
 
-    job.setMapperClass(CrawlDBScannerMapper.class);
-    job.setReducerClass(CrawlDBScannerReducer.class);
+    job.setMapperClass(CrawlDBScanner.class);
+    job.setReducerClass(CrawlDBScanner.class);
 
     FileOutputFormat.setOutputPath(job, outputPath);
 
@@ -114,13 +124,14 @@ public class CrawlDBScanner extends Conf
     // in order to check something - better to use the text format and avoid
     // compression
     if (text) {
-      job.setOutputFormatClass(TextOutputFormat.class);
+      job.set("mapred.output.compress", "false");
+      job.setOutputFormat(TextOutputFormat.class);
     }
     // otherwise what we will actually create is a mini-crawlDB which can be
     // then used
     // for debugging
     else {
-      job.setOutputFormatClass(MapFileOutputFormat.class);
+      job.setOutputFormat(MapFileOutputFormat.class);
     }
 
     job.setMapOutputKeyClass(Text.class);
@@ -130,7 +141,7 @@ public class CrawlDBScanner extends Conf
     job.setOutputValueClass(CrawlDatum.class);
 
     try {
-      job.waitForCompletion(true);
+      JobClient.runJob(job);
     } catch (IOException e) {
       throw e;
     }


Reply via email to