Author: markus
Date: Fri Dec 16 11:17:10 2011
New Revision: 1215090
URL: http://svn.apache.org/viewvc?rev=1215090&view=rev
Log:
NUTCH-1221 Migrate DomainStatistics to MapReduce API
Modified:
nutch/trunk/CHANGES.txt
nutch/trunk/src/java/org/apache/nutch/util/domain/DomainStatistics.java
Modified: nutch/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/nutch/trunk/CHANGES.txt?rev=1215090&r1=1215089&r2=1215090&view=diff
==============================================================================
--- nutch/trunk/CHANGES.txt (original)
+++ nutch/trunk/CHANGES.txt Fri Dec 16 11:17:10 2011
@@ -1,5 +1,7 @@
Nutch Change Log
+* NUTCH-1221 Migrate DomainStatistics to MapReduce API (markus)
+
* NUTCH-1216 Add trivial comment to lib/native/README.txt (lewismc)
* NUTCH-1214 DomainStats tool should be named for what it's doing (markus)
Modified:
nutch/trunk/src/java/org/apache/nutch/util/domain/DomainStatistics.java
URL:
http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/util/domain/DomainStatistics.java?rev=1215090&r1=1215089&r2=1215090&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/util/domain/DomainStatistics.java
(original)
+++ nutch/trunk/src/java/org/apache/nutch/util/domain/DomainStatistics.java Fri
Dec 16 11:17:10 2011
@@ -25,52 +25,43 @@ import java.util.Iterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.nutch.crawl.CrawlDatum;
import org.apache.nutch.util.NutchConfiguration;
-import org.apache.nutch.util.NutchJob;
import org.apache.nutch.util.TimingUtil;
import org.apache.nutch.util.URLUtil;
/**
- * Extracts some very basic statistics about domains from the crawldb
+ * Extracts some very basic statistics about domains from the crawldb
*/
-public class DomainStatistics
-extends MapReduceBase
-implements Tool, Mapper<Text, CrawlDatum, Text, LongWritable>,
- Reducer<Text, LongWritable, LongWritable, Text> {
+public class DomainStatistics extends Configured implements Tool {
private static final Logger LOG =
LoggerFactory.getLogger(DomainStatistics.class);
-
+
private static final Text FETCHED_TEXT = new Text("FETCHED");
private static final Text NOT_FETCHED_TEXT = new Text("NOT_FETCHED");
-
+
public static enum MyCounter {FETCHED, NOT_FETCHED, EMPTY_RESULT};
-
+
private static final int MODE_HOST = 1;
private static final int MODE_DOMAIN = 2;
private static final int MODE_SUFFIX = 3;
-
+
private int mode = 0;
-
- private Configuration conf;
-
- public int run(String[] args) throws IOException {
+
+ public int run(String[] args) throws Exception {
if (args.length < 3) {
System.out.println("usage: DomainStatistics inputDirs outDir
host|domain|suffix [numOfReducer]");
return 1;
@@ -78,7 +69,7 @@ implements Tool, Mapper<Text, CrawlDatum
String inputDir = args[0];
String outputDir = args[1];
int numOfReducers = 1;
-
+
if (args.length > 3) {
numOfReducers = Integer.parseInt(args[3]);
}
@@ -87,132 +78,125 @@ implements Tool, Mapper<Text, CrawlDatum
long start = System.currentTimeMillis();
LOG.info("DomainStatistics: starting at " + sdf.format(start));
- JobConf job = new NutchJob(getConf());
-
int mode = 0;
+ String jobName = "DomainStatistics";
if(args[2].equals("host")) {
- job.setJobName("Host statistics");
+ jobName = "Host statistics";
mode = MODE_HOST;
} else if(args[2].equals("domain")) {
- job.setJobName("Domain statistics");
+ jobName = "Domain statistics";
mode = MODE_DOMAIN;
} else if(args[2].equals("suffix")) {
- job.setJobName("Suffix statistics");
+ jobName = "Suffix statistics";
mode = MODE_SUFFIX;
}
- job.setInt("domain.statistics.mode", mode);
+ Configuration conf = getConf();
+ conf.setInt("domain.statistics.mode", mode);
+ conf.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
+
+ Job job = new Job(conf, jobName);
+ job.setJarByClass(DomainStatistics.class);
String[] inputDirsSpecs = inputDir.split(",");
for (int i = 0; i < inputDirsSpecs.length; i++) {
FileInputFormat.addInputPath(job, new Path(inputDirsSpecs[i]));
}
- job.setInputFormat(SequenceFileInputFormat.class);
- job.setMapperClass(DomainStatistics.class);
+ job.setInputFormatClass(SequenceFileInputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(outputDir));
- job.setOutputFormat(TextOutputFormat.class);
+ job.setOutputFormatClass(TextOutputFormat.class);
+
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
- job.setReducerClass(DomainStatistics.class);
+
+ job.setMapperClass(DomainStatisticsMapper.class);
+ job.setReducerClass(DomainStatisticsReducer.class);
job.setCombinerClass(DomainStatisticsCombiner.class);
job.setNumReduceTasks(numOfReducers);
-
- JobClient.runJob(job);
-
+
+ try {
+ job.waitForCompletion(true);
+ } catch (Exception e) {
+ throw e;
+ }
+
long end = System.currentTimeMillis();
LOG.info("DomainStatistics: finished at " + sdf.format(end) + ", elapsed:
" + TimingUtil.elapsedTime(start, end));
return 0;
}
- @Override
- public void configure(JobConf job) {
- super.configure(job);
- mode = job.getInt("domain.statistics.mode", MODE_DOMAIN);
- }
-
+ static class DomainStatisticsMapper extends Mapper<Text, CrawlDatum, Text,
LongWritable> {
+ int mode = 0;
- public Configuration getConf() {
- return conf;
- }
+ public void setup(Context context) {
+ mode = context.getConfiguration().getInt("domain.statistics.mode",
MODE_DOMAIN);
+ }
- public void setConf(Configuration conf) {
- this.conf = conf;
- }
+ public void map(Text urlText, CrawlDatum datum, Context context) throws
IOException, InterruptedException {
- public void map(Text urlText, CrawlDatum datum,
- OutputCollector<Text, LongWritable> output, Reporter reporter)
- throws IOException {
-
- if(datum.getStatus() == CrawlDatum.STATUS_DB_FETCHED
- || datum.getStatus() == CrawlDatum.STATUS_DB_NOTMODIFIED) {
-
- try {
- URL url = new URL(urlText.toString());
- String out = null;
- switch (mode) {
- case MODE_HOST:
- out = url.getHost();
- break;
- case MODE_DOMAIN:
- out = URLUtil.getDomainName(url);
- break;
- case MODE_SUFFIX:
- out = URLUtil.getDomainSuffix(url).getDomain();
- break;
- }
- if(out.trim().equals("")) {
- LOG.info("url : " + url);
- reporter.incrCounter(MyCounter.EMPTY_RESULT, 1);
- }
-
- output.collect(new Text(out), new LongWritable(1));
- } catch (Exception ex) { }
- reporter.incrCounter(MyCounter.FETCHED, 1);
- output.collect(FETCHED_TEXT, new LongWritable(1));
- }
- else {
- reporter.incrCounter(MyCounter.NOT_FETCHED, 1);
- output.collect(NOT_FETCHED_TEXT, new LongWritable(1));
+ if(datum.getStatus() == CrawlDatum.STATUS_DB_FETCHED
+ || datum.getStatus() == CrawlDatum.STATUS_DB_NOTMODIFIED) {
+
+ try {
+ URL url = new URL(urlText.toString());
+ String out = null;
+ switch (mode) {
+ case MODE_HOST:
+ out = url.getHost();
+ break;
+ case MODE_DOMAIN:
+ out = URLUtil.getDomainName(url);
+ break;
+ case MODE_SUFFIX:
+ out = URLUtil.getDomainSuffix(url).getDomain();
+ break;
+ }
+ if(out.trim().equals("")) {
+ LOG.info("url : " + url);
+ context.getCounter(MyCounter.EMPTY_RESULT).increment(1);
+ }
+
+ context.write(new Text(out), new LongWritable(1));
+ } catch (Exception ex) { }
+
+ context.getCounter(MyCounter.FETCHED).increment(1);
+ context.write(FETCHED_TEXT, new LongWritable(1));
+ }
+ else {
+ context.getCounter(MyCounter.NOT_FETCHED).increment(1);
+ context.write(NOT_FETCHED_TEXT, new LongWritable(1));
+ }
}
}
- public void reduce(Text key, Iterator<LongWritable> values,
- OutputCollector<LongWritable, Text> output, Reporter reporter)
- throws IOException {
-
- long total = 0;
-
- while(values.hasNext()) {
- LongWritable val = values.next();
- total += val.get();
+ static class DomainStatisticsReducer extends Reducer <Text, LongWritable,
LongWritable, Text> {
+ public void reduce(Text key, Iterable<LongWritable> values, Context
context) throws IOException, InterruptedException {
+ long total = 0;
+
+ for (LongWritable val : values) {
+ total += val.get();
+ }
+
+ context.write(new LongWritable(total), key);
}
- //invert output
- output.collect(new LongWritable(total), key);
}
-
-
- public static class DomainStatisticsCombiner extends MapReduceBase
- implements Reducer<Text, LongWritable, Text, LongWritable> {
-
- public void reduce(Text key, Iterator<LongWritable> values,
- OutputCollector<Text, LongWritable> output, Reporter reporter)
- throws IOException {
+
+ public static class DomainStatisticsCombiner extends Reducer <Text,
LongWritable, Text, LongWritable> {
+ public void reduce(Text key, Iterable<LongWritable> values, Context
context) throws IOException, InterruptedException {
long total = 0;
-
- while(values.hasNext()) {
- LongWritable val = values.next();
+
+ for (LongWritable val : values) {
total += val.get();
- }
- output.collect(key, new LongWritable(total));
+ }
+ context.write(key, new LongWritable(total));
}
-
}
public static void main(String[] args) throws Exception {
ToolRunner.run(NutchConfiguration.create(), new DomainStatistics(), args);
}
-
+
}