Author: jnioche
Date: Thu May 15 08:14:38 2014
New Revision: 1594813
URL: http://svn.apache.org/r1594813
Log:
NUTCH-1674 Use batchId filter to enable scan (GORA-119) for
Fetch,Parse,Update,Index (Tien Nguyen Manh and Alparslan Avcı via jnioche)
Modified:
nutch/branches/2.x/CHANGES.txt
nutch/branches/2.x/src/java/org/apache/nutch/crawl/DbUpdateMapper.java
nutch/branches/2.x/src/java/org/apache/nutch/crawl/DbUpdaterJob.java
nutch/branches/2.x/src/java/org/apache/nutch/fetcher/FetcherJob.java
nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexingJob.java
nutch/branches/2.x/src/java/org/apache/nutch/parse/ParserJob.java
nutch/branches/2.x/src/java/org/apache/nutch/storage/Mark.java
nutch/branches/2.x/src/java/org/apache/nutch/storage/StorageUtils.java
Modified: nutch/branches/2.x/CHANGES.txt
URL:
http://svn.apache.org/viewvc/nutch/branches/2.x/CHANGES.txt?rev=1594813&r1=1594812&r2=1594813&view=diff
==============================================================================
--- nutch/branches/2.x/CHANGES.txt (original)
+++ nutch/branches/2.x/CHANGES.txt Thu May 15 08:14:38 2014
@@ -2,6 +2,8 @@ Nutch Change Log
Current Development
+* NUTCH-1674 Use batchId filter to enable scan (GORA-119) for
Fetch,Parse,Update,Index (Tien Nguyen Manh and Alparslan Avcı via jnioche)
+
* NUTCH-1714 Upgrade to Gora 0.4 (Alparslan Avcı via jnioche)
* NUTCH-1752 Cache robots.txt rules per protocol:host:port (snagel)
Modified: nutch/branches/2.x/src/java/org/apache/nutch/crawl/DbUpdateMapper.java
URL:
http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/crawl/DbUpdateMapper.java?rev=1594813&r1=1594812&r2=1594813&view=diff
==============================================================================
--- nutch/branches/2.x/src/java/org/apache/nutch/crawl/DbUpdateMapper.java
(original)
+++ nutch/branches/2.x/src/java/org/apache/nutch/crawl/DbUpdateMapper.java Thu
May 15 08:14:38 2014
@@ -55,15 +55,13 @@ extends GoraMapper<String, WebPage, UrlW
@Override
public void map(String key, WebPage page, Context context)
throws IOException, InterruptedException {
-
- Utf8 mark = Mark.GENERATE_MARK.checkMark(page);
- if(!NutchJob.shouldProcess(mark,batchId)) {
+ if(Mark.GENERATE_MARK.checkMark(page) == null) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Skipping " + TableUtil.unreverseUrl(key) + "; different
batch id (" + mark + ")");
+ LOG.debug("Skipping " + TableUtil.unreverseUrl(key) + "; not generated
yet");
}
return;
}
-
+
String url = TableUtil.unreverseUrl(key);
scoreData.clear();
Modified: nutch/branches/2.x/src/java/org/apache/nutch/crawl/DbUpdaterJob.java
URL:
http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/crawl/DbUpdaterJob.java?rev=1594813&r1=1594812&r2=1594813&view=diff
==============================================================================
--- nutch/branches/2.x/src/java/org/apache/nutch/crawl/DbUpdaterJob.java
(original)
+++ nutch/branches/2.x/src/java/org/apache/nutch/crawl/DbUpdaterJob.java Thu
May 15 08:14:38 2014
@@ -22,6 +22,9 @@ import java.util.HashSet;
import java.util.Map;
import org.apache.avro.util.Utf8;
+import org.apache.gora.filter.FilterOp;
+import org.apache.gora.filter.MapFieldValueFilter;
+import org.apache.gora.filter.SingleFieldValueFilter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@@ -30,6 +33,7 @@ import org.apache.nutch.crawl.UrlWithSco
import
org.apache.nutch.crawl.UrlWithScore.UrlScoreComparator.UrlOnlyComparator;
import org.apache.nutch.metadata.Nutch;
import org.apache.nutch.scoring.ScoringFilters;
+import org.apache.nutch.storage.Mark;
import org.apache.nutch.storage.StorageUtils;
import org.apache.nutch.storage.WebPage;
import org.apache.nutch.util.NutchConfiguration;
@@ -103,14 +107,28 @@ public class DbUpdaterJob extends NutchT
currentJob.setSortComparatorClass(UrlScoreComparator.class);
currentJob.setGroupingComparatorClass(UrlOnlyComparator.class);
+ MapFieldValueFilter<String, WebPage> batchIdFilter =
getBatchIdFilter(batchId);
StorageUtils.initMapperJob(currentJob, fields, UrlWithScore.class,
- NutchWritable.class, DbUpdateMapper.class);
+ NutchWritable.class, DbUpdateMapper.class, batchIdFilter);
StorageUtils.initReducerJob(currentJob, DbUpdateReducer.class);
currentJob.waitForCompletion(true);
ToolUtil.recordJobStatus(null, currentJob, results);
return results;
}
-
+
+ private MapFieldValueFilter<String, WebPage> getBatchIdFilter(String
batchId) {
+ if (batchId.equals(Nutch.ALL_CRAWL_ID.toString())) {
+ return null;
+ }
+ MapFieldValueFilter<String, WebPage> filter = new
MapFieldValueFilter<String, WebPage>();
+ filter.setFieldName(WebPage.Field.MARKERS.toString());
+ filter.setFilterOp(FilterOp.EQUALS);
+ filter.setFilterIfMissing(true);
+ filter.setMapKey(Mark.GENERATE_MARK.getName());
+ filter.getOperands().add(new Utf8(batchId));
+ return filter;
+ }
+
private int updateTable(String crawlId,String batchId) throws Exception {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Modified: nutch/branches/2.x/src/java/org/apache/nutch/fetcher/FetcherJob.java
URL:
http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/fetcher/FetcherJob.java?rev=1594813&r1=1594812&r2=1594813&view=diff
==============================================================================
--- nutch/branches/2.x/src/java/org/apache/nutch/fetcher/FetcherJob.java
(original)
+++ nutch/branches/2.x/src/java/org/apache/nutch/fetcher/FetcherJob.java Thu
May 15 08:14:38 2014
@@ -26,6 +26,8 @@ import java.util.Random;
import java.util.StringTokenizer;
import org.apache.avro.util.Utf8;
+import org.apache.gora.filter.FilterOp;
+import org.apache.gora.filter.MapFieldValueFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -109,10 +111,10 @@ public class FetcherJob extends NutchToo
@Override
protected void map(String key, WebPage page, Context context)
throws IOException, InterruptedException {
- Utf8 mark = Mark.GENERATE_MARK.checkMark(page);
- if (!NutchJob.shouldProcess(mark, batchId)) {
+ if (Mark.GENERATE_MARK.checkMark(page) == null) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Skipping " + TableUtil.unreverseUrl(key) + "; different
batch id (" + mark + ")");
+ LOG.debug("Skipping " + TableUtil.unreverseUrl(key)
+ + "; not generated yet");
}
return;
}
@@ -188,8 +190,10 @@ public class FetcherJob extends NutchToo
currentJob.setReduceSpeculativeExecution(false);
Collection<WebPage.Field> fields = getFields(currentJob);
+ MapFieldValueFilter<String, WebPage> batchIdFilter =
getBatchIdFilter(batchId);
StorageUtils.initMapperJob(currentJob, fields, IntWritable.class,
- FetchEntry.class, FetcherMapper.class, FetchEntryPartitioner.class,
false);
+ FetchEntry.class, FetcherMapper.class, FetchEntryPartitioner.class,
+ batchIdFilter, false);
StorageUtils.initReducerJob(currentJob, FetcherReducer.class);
if (numTasks == null || numTasks < 1) {
currentJob.setNumReduceTasks(currentJob.getConfiguration().getInt("mapred.map.tasks",
@@ -202,7 +206,20 @@ public class FetcherJob extends NutchToo
return results;
}
- /**
+ private MapFieldValueFilter<String, WebPage> getBatchIdFilter(String
batchId) {
+ if (batchId.equals(Nutch.ALL_CRAWL_ID.toString())) {
+ return null;
+ }
+ MapFieldValueFilter<String, WebPage> filter = new
MapFieldValueFilter<String, WebPage>();
+ filter.setFieldName(WebPage.Field.MARKERS.toString());
+ filter.setFilterOp(FilterOp.EQUALS);
+ filter.setFilterIfMissing(true);
+ filter.setMapKey(Mark.GENERATE_MARK.getName());
+ filter.getOperands().add(new Utf8(batchId));
+ return filter;
+ }
+
+ /**
* Run fetcher.
* @param batchId batchId (obtained from Generator) or null to fetch all
generated fetchlists
* @param threads number of threads per map task
Modified: nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexingJob.java
URL:
http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexingJob.java?rev=1594813&r1=1594812&r2=1594813&view=diff
==============================================================================
--- nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexingJob.java
(original)
+++ nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexingJob.java Thu
May 15 08:14:38 2014
@@ -22,6 +22,8 @@ import java.util.HashSet;
import java.util.Map;
import org.apache.avro.util.Utf8;
+import org.apache.gora.filter.FilterOp;
+import org.apache.gora.filter.MapFieldValueFilter;
import org.apache.gora.mapreduce.GoraMapper;
import org.apache.gora.mapreduce.StringComparator;
import org.apache.gora.store.DataStore;
@@ -99,14 +101,12 @@ public class IndexingJob extends NutchTo
}
Utf8 mark = Mark.UPDATEDB_MARK.checkMark(page);
- if (!batchId.equals(REINDEX)) {
- if (!NutchJob.shouldProcess(mark, batchId)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Skipping " + TableUtil.unreverseUrl(key)
- + "; different batch id (" + mark + ")");
- }
- return;
+ if (mark == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Skipping " + TableUtil.unreverseUrl(key)
+ + "; not updated on db yet");
}
+ return;
}
NutchDocument doc = indexUtil.index(key, page);
@@ -145,8 +145,9 @@ public class IndexingJob extends NutchTo
StringComparator.class, RawComparator.class);
Collection<WebPage.Field> fields = getFields(job);
+ MapFieldValueFilter<String, WebPage> batchIdFilter =
getBatchIdFilter(batchId);
StorageUtils.initMapperJob(job, fields, String.class, NutchDocument.class,
- IndexerMapper.class);
+ IndexerMapper.class, batchIdFilter);
job.setNumReduceTasks(0);
job.setOutputFormatClass(IndexerOutputFormat.class);
@@ -155,6 +156,20 @@ public class IndexingJob extends NutchTo
return results;
}
+ private MapFieldValueFilter<String, WebPage> getBatchIdFilter(String
batchId) {
+ if (batchId.equals(REINDEX.toString())
+ || batchId.equals(Nutch.ALL_CRAWL_ID.toString())) {
+ return null;
+ }
+ MapFieldValueFilter<String, WebPage> filter = new
MapFieldValueFilter<String, WebPage>();
+ filter.setFieldName(WebPage.Field.MARKERS.toString());
+ filter.setFilterOp(FilterOp.EQUALS);
+ filter.setFilterIfMissing(true);
+ filter.setMapKey(Mark.UPDATEDB_MARK.getName());
+ filter.getOperands().add(new Utf8(batchId));
+ return filter;
+ }
+
public void index(String batchId) throws Exception {
LOG.info("IndexingJob: starting");
Modified: nutch/branches/2.x/src/java/org/apache/nutch/parse/ParserJob.java
URL:
http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/parse/ParserJob.java?rev=1594813&r1=1594812&r2=1594813&view=diff
==============================================================================
--- nutch/branches/2.x/src/java/org/apache/nutch/parse/ParserJob.java (original)
+++ nutch/branches/2.x/src/java/org/apache/nutch/parse/ParserJob.java Thu May
15 08:14:38 2014
@@ -24,6 +24,7 @@ import java.util.HashSet;
import java.util.Map;
import org.apache.avro.util.Utf8;
+import org.apache.gora.filter.MapFieldValueFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -47,6 +48,8 @@ import org.apache.nutch.util.StringUtil;
import org.apache.nutch.util.TableUtil;
import org.apache.nutch.util.TimingUtil;
import org.apache.nutch.util.ToolUtil;
+import org.apache.gora.filter.FilterOp;
+import org.apache.gora.filter.SingleFieldValueFilter;
import org.apache.gora.mapreduce.GoraMapper;
public class ParserJob extends NutchTool implements Tool {
@@ -102,14 +105,14 @@ public class ParserJob extends NutchTool
@Override
public void map(String key, WebPage page, Context context)
throws IOException, InterruptedException {
- CharSequence mark = Mark.FETCH_MARK.checkMark(page);
String unreverseKey = TableUtil.unreverseUrl(key);
if (batchId.equals(REPARSE)) {
LOG.debug("Reparsing " + unreverseKey);
} else {
- if (!NutchJob.shouldProcess(mark, batchId)) {
+ if (Mark.FETCH_MARK.checkMark(page) == null) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Skipping " + TableUtil.unreverseUrl(key) + "; different
batch id (" + mark + ")");
+ LOG.debug("Skipping " + TableUtil.unreverseUrl(key)
+ + "; not fetched yet");
}
return;
}
@@ -247,8 +250,9 @@ public class ParserJob extends NutchTool
currentJob = new NutchJob(getConf(), "parse");
Collection<WebPage.Field> fields = getFields(currentJob);
- StorageUtils.initMapperJob(currentJob, fields, String.class, WebPage.class,
- ParserMapper.class);
+ MapFieldValueFilter<String, WebPage> batchIdFilter =
getBatchIdFilter(batchId);
+ StorageUtils.initMapperJob(currentJob, fields, String.class,
WebPage.class,
+ ParserMapper.class, batchIdFilter);
StorageUtils.initReducerJob(currentJob, IdentityPageReducer.class);
currentJob.setNumReduceTasks(0);
@@ -257,6 +261,20 @@ public class ParserJob extends NutchTool
return results;
}
+ private MapFieldValueFilter<String, WebPage> getBatchIdFilter(String
batchId) {
+ if (batchId.equals(REPARSE.toString())
+ || batchId.equals(Nutch.ALL_CRAWL_ID.toString())) {
+ return null;
+ }
+ MapFieldValueFilter<String, WebPage> filter = new
MapFieldValueFilter<String, WebPage>();
+ filter.setFieldName(WebPage.Field.MARKERS.toString());
+ filter.setFilterOp(FilterOp.EQUALS);
+ filter.setFilterIfMissing(true);
+ filter.setMapKey(Mark.FETCH_MARK.getName());
+ filter.getOperands().add(new Utf8(batchId));
+ return filter;
+ }
+
public int parse(String batchId, boolean shouldResume, boolean force) throws
Exception {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Modified: nutch/branches/2.x/src/java/org/apache/nutch/storage/Mark.java
URL:
http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/storage/Mark.java?rev=1594813&r1=1594812&r2=1594813&view=diff
==============================================================================
--- nutch/branches/2.x/src/java/org/apache/nutch/storage/Mark.java (original)
+++ nutch/branches/2.x/src/java/org/apache/nutch/storage/Mark.java Thu May 15
08:14:38 2014
@@ -55,4 +55,8 @@ public enum Mark {
}
return null;
}
+
+ public Utf8 getName() {
+ return name;
+ }
}
Modified: nutch/branches/2.x/src/java/org/apache/nutch/storage/StorageUtils.java
URL:
http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/storage/StorageUtils.java?rev=1594813&r1=1594812&r2=1594813&view=diff
==============================================================================
--- nutch/branches/2.x/src/java/org/apache/nutch/storage/StorageUtils.java
(original)
+++ nutch/branches/2.x/src/java/org/apache/nutch/storage/StorageUtils.java Thu
May 15 08:14:38 2014
@@ -16,6 +16,7 @@
******************************************************************************/
package org.apache.nutch.storage;
+import org.apache.gora.filter.Filter;
import org.apache.gora.mapreduce.GoraMapper;
import org.apache.gora.mapreduce.GoraOutputFormat;
import org.apache.gora.mapreduce.GoraReducer;
@@ -115,17 +116,42 @@ public class StorageUtils {
Class<K> outKeyClass, Class<V> outValueClass,
Class<? extends GoraMapper<String, WebPage, K, V>> mapperClass,
Class<? extends Partitioner<K, V>> partitionerClass, boolean
reuseObjects)
- throws ClassNotFoundException, IOException {
+ throws ClassNotFoundException, IOException {
+ initMapperJob(job, fields, outKeyClass, outValueClass, mapperClass,
+ partitionerClass, null, reuseObjects);
+ }
+
+ public static <K, V> void initMapperJob(Job job,
+ Collection<WebPage.Field> fields, Class<K> outKeyClass,
+ Class<V> outValueClass,
+ Class<? extends GoraMapper<String, WebPage, K, V>> mapperClass,
+ Class<? extends Partitioner<K, V>> partitionerClass,
+ Filter<String, WebPage> filter, boolean reuseObjects)
+ throws ClassNotFoundException, IOException {
DataStore<String, WebPage> store = createWebStore(job.getConfiguration(),
String.class, WebPage.class);
- if (store==null) throw new RuntimeException("Could not create datastore");
+ if (store == null)
+ throw new RuntimeException("Could not create datastore");
Query<String, WebPage> query = store.newQuery();
query.setFields(toStringArray(fields));
- GoraMapper.initMapperJob(job, query, store,
- outKeyClass, outValueClass, mapperClass, partitionerClass,
reuseObjects);
+ if (filter != null) {
+ query.setFilter(filter);
+ }
+ GoraMapper.initMapperJob(job, query, store, outKeyClass, outValueClass,
+ mapperClass, partitionerClass, reuseObjects);
GoraOutputFormat.setOutput(job, store, true);
}
+ public static <K, V> void initMapperJob(Job job,
+ Collection<WebPage.Field> fields, Class<K> outKeyClass,
+ Class<V> outValueClass,
+ Class<? extends GoraMapper<String, WebPage, K, V>> mapperClass,
+ Filter<String, WebPage> filter) throws ClassNotFoundException,
+ IOException {
+ initMapperJob(job, fields, outKeyClass, outValueClass, mapperClass, null,
+ filter, true);
+ }
+
public static <K, V> void initReducerJob(Job job,
Class<? extends GoraReducer<K, V, String, WebPage>> reducerClass)
throws ClassNotFoundException, GoraException {