Author: markus
Date: Mon Dec 19 15:15:43 2011
New Revision: 1220788
URL: http://svn.apache.org/viewvc?rev=1220788&view=rev
Log:
NUTCH-1225 Migrate CrawlDBScanner to MapReduce API
Modified:
nutch/trunk/CHANGES.txt
nutch/trunk/src/java/org/apache/nutch/tools/CrawlDBScanner.java
Modified: nutch/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/nutch/trunk/CHANGES.txt?rev=1220788&r1=1220787&r2=1220788&view=diff
==============================================================================
--- nutch/trunk/CHANGES.txt (original)
+++ nutch/trunk/CHANGES.txt Mon Dec 19 15:15:43 2011
@@ -1,5 +1,7 @@
Nutch Change Log
+* NUTCH-1225 Migrate CrawlDBScanner to MapReduce API (markus)
+
* NUTCH-1222 Upgrade to new Hadoop 0.22.0 (markus)
* NUTCH-1221 Migrate DomainStatistics to MapReduce API (markus)
Modified: nutch/trunk/src/java/org/apache/nutch/tools/CrawlDBScanner.java
URL:
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/tools/CrawlDBScanner.java?rev=1220788&r1=1220787&r2=1220788&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/tools/CrawlDBScanner.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/tools/CrawlDBScanner.java Mon Dec 19
15:15:43 2011
@@ -26,24 +26,20 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapFileOutputFormat;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.nutch.crawl.CrawlDatum;
import org.apache.nutch.crawl.CrawlDb;
import org.apache.nutch.util.NutchConfiguration;
-import org.apache.nutch.util.NutchJob;
import org.apache.nutch.util.TimingUtil;
/**
@@ -52,71 +48,65 @@ import org.apache.nutch.util.TimingUtil;
* used as a new CrawlDB. The dump mechanism of the crawldb reader is not very
* useful on large crawldbs as the ouput can be extremely large and the -url
* function can't help if we don't know what url we want to have a look at.
- *
+ *
* @author : Julien Nioche
*/
-
-public class CrawlDBScanner extends Configured implements Tool,
- Mapper<Text,CrawlDatum,Text,CrawlDatum>,
Reducer<Text,CrawlDatum,Text,CrawlDatum> {
+public class CrawlDBScanner extends Configured implements Tool {
public static final Logger LOG =
LoggerFactory.getLogger(CrawlDBScanner.class);
- public CrawlDBScanner() {}
-
- public CrawlDBScanner(Configuration conf) {
- setConf(conf);
- }
-
- public void close() {}
-
- private String regex = null;
- private String status = null;
- public void configure(JobConf job) {
- regex = job.get("CrawlDBScanner.regex");
- status = job.get("CrawlDBScanner.status");
- }
-
- public void map(Text url, CrawlDatum crawlDatum,
- OutputCollector<Text,CrawlDatum> output, Reporter reporter) throws
IOException {
-
- // check status
- if (status != null
- &&
!status.equalsIgnoreCase(CrawlDatum.getStatusName(crawlDatum.getStatus())))
return;
-
- // if URL matched regexp dump it
- if (url.toString().matches(regex)) {
- output.collect(url, crawlDatum);
+ static class CrawlDBScannerMapper extends
Mapper<Text,CrawlDatum,Text,CrawlDatum> {
+ private String regex = null;
+ private String status = null;
+
+ public void setup(Context context) {
+ regex = context.getConfiguration().get("CrawlDBScanner.regex");
+ status = context.getConfiguration().get("CrawlDBScanner.status");
+ }
+
+ public void map(Text url, CrawlDatum crawlDatum, Context context) throws
IOException, InterruptedException {
+ // check status
+ if (status != null
+ &&
!status.equalsIgnoreCase(CrawlDatum.getStatusName(crawlDatum.getStatus())))
return;
+
+ // if URL matched regexp dump it
+ if (url.toString().matches(regex)) {
+ context.write(url, crawlDatum);
+ }
}
}
- public void reduce(Text key, Iterator<CrawlDatum> values,
- OutputCollector<Text,CrawlDatum> output, Reporter reporter) throws
IOException {
- while (values.hasNext()) {
- CrawlDatum val = values.next();
- output.collect(key, val);
+ static class CrawlDBScannerReducer extends Reducer
<Text,CrawlDatum,Text,CrawlDatum> {
+ public void reduce(Text key, Iterable<CrawlDatum> values, Context context)
throws IOException, InterruptedException {
+ for (CrawlDatum val : values) {
+ context.write(key, val);
+ }
}
}
private void scan(Path crawlDb, Path outputPath, String regex, String status,
- boolean text) throws IOException {
+ boolean text) throws Exception {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long start = System.currentTimeMillis();
LOG.info("CrawlDB scanner: starting at " + sdf.format(start));
- JobConf job = new NutchJob(getConf());
- job.setJobName("Scan : " + crawlDb + " for URLS matching : " + regex);
+ Configuration conf = getConf();
+ conf.set("CrawlDBScanner.regex", regex);
+ if (status != null) conf.set("CrawlDBScanner.status", status);
+ if (text) conf.set("mapred.output.compress", "false");
+ conf.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
- job.set("CrawlDBScanner.regex", regex);
- if (status != null) job.set("CrawlDBScanner.status", status);
+ Job job = new Job(conf, "Scan : " + crawlDb + " for URLS matching : " +
regex);
+ job.setJarByClass(CrawlDBScanner.class);
FileInputFormat.addInputPath(job, new Path(crawlDb, CrawlDb.CURRENT_NAME));
- job.setInputFormat(SequenceFileInputFormat.class);
+ job.setInputFormatClass(SequenceFileInputFormat.class);
- job.setMapperClass(CrawlDBScanner.class);
- job.setReducerClass(CrawlDBScanner.class);
+ job.setMapperClass(CrawlDBScannerMapper.class);
+ job.setReducerClass(CrawlDBScannerReducer.class);
FileOutputFormat.setOutputPath(job, outputPath);
@@ -124,14 +114,13 @@ public class CrawlDBScanner extends Conf
// in order to check something - better to use the text format and avoid
// compression
if (text) {
- job.set("mapred.output.compress", "false");
- job.setOutputFormat(TextOutputFormat.class);
+ job.setOutputFormatClass(TextOutputFormat.class);
}
// otherwise what we will actually create is a mini-crawlDB which can be
// then used
// for debugging
else {
- job.setOutputFormat(MapFileOutputFormat.class);
+ job.setOutputFormatClass(MapFileOutputFormat.class);
}
job.setMapOutputKeyClass(Text.class);
@@ -141,7 +130,7 @@ public class CrawlDBScanner extends Conf
job.setOutputValueClass(CrawlDatum.class);
try {
- JobClient.runJob(job);
+ job.waitForCompletion(true);
} catch (IOException e) {
throw e;
}