Author: markus
Date: Tue Dec 27 13:22:50 2011
New Revision: 1224905
URL: http://svn.apache.org/viewvc?rev=1224905&view=rev
Log:
Reverting Nutch-1125 CrawlDBScanner
Modified:
nutch/trunk/CHANGES.txt
nutch/trunk/src/java/org/apache/nutch/tools/CrawlDBScanner.java
Modified: nutch/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/nutch/trunk/CHANGES.txt?rev=1224905&r1=1224904&r2=1224905&view=diff
==============================================================================
--- nutch/trunk/CHANGES.txt (original)
+++ nutch/trunk/CHANGES.txt Tue Dec 27 13:22:50 2011
@@ -6,8 +6,6 @@ Nutch Change Log
* NUTCH-1184 Fetcher to parse and follow Nth degree outlinks (markus)
-* NUTCH-1225 Migrate CrawlDBScanner to MapReduce API (markus)
-
* NUTCH-1222 Upgrade to new Hadoop 0.22.0 (markus)
* NUTCH-1221 Migrate DomainStatistics to MapReduce API (markus)
Modified: nutch/trunk/src/java/org/apache/nutch/tools/CrawlDBScanner.java
URL:
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/tools/CrawlDBScanner.java?rev=1224905&r1=1224904&r2=1224905&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/tools/CrawlDBScanner.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/tools/CrawlDBScanner.java Tue Dec 27
13:22:50 2011
@@ -26,20 +26,24 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapFileOutputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.nutch.crawl.CrawlDatum;
import org.apache.nutch.crawl.CrawlDb;
import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
import org.apache.nutch.util.TimingUtil;
/**
@@ -48,65 +52,71 @@ import org.apache.nutch.util.TimingUtil;
* used as a new CrawlDB. The dump mechanism of the crawldb reader is not very
* useful on large crawldbs as the ouput can be extremely large and the -url
* function can't help if we don't know what url we want to have a look at.
- *
+ *
* @author : Julien Nioche
*/
-public class CrawlDBScanner extends Configured implements Tool {
+
+public class CrawlDBScanner extends Configured implements Tool,
+ Mapper<Text,CrawlDatum,Text,CrawlDatum>,
Reducer<Text,CrawlDatum,Text,CrawlDatum> {
public static final Logger LOG =
LoggerFactory.getLogger(CrawlDBScanner.class);
+ public CrawlDBScanner() {}
- static class CrawlDBScannerMapper extends
Mapper<Text,CrawlDatum,Text,CrawlDatum> {
- private String regex = null;
- private String status = null;
-
- public void setup(Context context) {
- regex = context.getConfiguration().get("CrawlDBScanner.regex");
- status = context.getConfiguration().get("CrawlDBScanner.status");
- }
-
- public void map(Text url, CrawlDatum crawlDatum, Context context) throws
IOException, InterruptedException {
- // check status
- if (status != null
- &&
!status.equalsIgnoreCase(CrawlDatum.getStatusName(crawlDatum.getStatus())))
return;
-
- // if URL matched regexp dump it
- if (url.toString().matches(regex)) {
- context.write(url, crawlDatum);
- }
+ public CrawlDBScanner(Configuration conf) {
+ setConf(conf);
+ }
+
+ public void close() {}
+
+ private String regex = null;
+ private String status = null;
+
+ public void configure(JobConf job) {
+ regex = job.get("CrawlDBScanner.regex");
+ status = job.get("CrawlDBScanner.status");
+ }
+
+ public void map(Text url, CrawlDatum crawlDatum,
+ OutputCollector<Text,CrawlDatum> output, Reporter reporter) throws
IOException {
+
+ // check status
+ if (status != null
+ &&
!status.equalsIgnoreCase(CrawlDatum.getStatusName(crawlDatum.getStatus())))
return;
+
+ // if URL matched regexp dump it
+ if (url.toString().matches(regex)) {
+ output.collect(url, crawlDatum);
}
}
- static class CrawlDBScannerReducer extends Reducer
<Text,CrawlDatum,Text,CrawlDatum> {
- public void reduce(Text key, Iterable<CrawlDatum> values, Context context)
throws IOException, InterruptedException {
- for (CrawlDatum val : values) {
- context.write(key, val);
- }
+ public void reduce(Text key, Iterator<CrawlDatum> values,
+ OutputCollector<Text,CrawlDatum> output, Reporter reporter) throws
IOException {
+ while (values.hasNext()) {
+ CrawlDatum val = values.next();
+ output.collect(key, val);
}
}
private void scan(Path crawlDb, Path outputPath, String regex, String status,
- boolean text) throws Exception {
+ boolean text) throws IOException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long start = System.currentTimeMillis();
LOG.info("CrawlDB scanner: starting at " + sdf.format(start));
+ JobConf job = new NutchJob(getConf());
- Configuration conf = getConf();
- conf.set("CrawlDBScanner.regex", regex);
- if (status != null) conf.set("CrawlDBScanner.status", status);
- if (text) conf.set("mapred.output.compress", "false");
- conf.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
+ job.setJobName("Scan : " + crawlDb + " for URLS matching : " + regex);
- Job job = new Job(conf, "Scan : " + crawlDb + " for URLS matching : " +
regex);
- job.setJarByClass(CrawlDBScanner.class);
+ job.set("CrawlDBScanner.regex", regex);
+ if (status != null) job.set("CrawlDBScanner.status", status);
FileInputFormat.addInputPath(job, new Path(crawlDb, CrawlDb.CURRENT_NAME));
- job.setInputFormatClass(SequenceFileInputFormat.class);
+ job.setInputFormat(SequenceFileInputFormat.class);
- job.setMapperClass(CrawlDBScannerMapper.class);
- job.setReducerClass(CrawlDBScannerReducer.class);
+ job.setMapperClass(CrawlDBScanner.class);
+ job.setReducerClass(CrawlDBScanner.class);
FileOutputFormat.setOutputPath(job, outputPath);
@@ -114,13 +124,14 @@ public class CrawlDBScanner extends Conf
// in order to check something - better to use the text format and avoid
// compression
if (text) {
- job.setOutputFormatClass(TextOutputFormat.class);
+ job.set("mapred.output.compress", "false");
+ job.setOutputFormat(TextOutputFormat.class);
}
// otherwise what we will actually create is a mini-crawlDB which can be
// then used
// for debugging
else {
- job.setOutputFormatClass(MapFileOutputFormat.class);
+ job.setOutputFormat(MapFileOutputFormat.class);
}
job.setMapOutputKeyClass(Text.class);
@@ -130,7 +141,7 @@ public class CrawlDBScanner extends Conf
job.setOutputValueClass(CrawlDatum.class);
try {
- job.waitForCompletion(true);
+ JobClient.runJob(job);
} catch (IOException e) {
throw e;
}