Author: ferdy
Date: Mon May 7 15:30:38 2012
New Revision: 1335065
URL: http://svn.apache.org/viewvc?rev=1335065&view=rev
Log:
NUTCH-1356 ParseUtil use ExecutorService instead of manually thread handling.
Modified:
nutch/branches/nutchgora/CHANGES.txt
nutch/branches/nutchgora/src/java/org/apache/nutch/parse/ParseUtil.java
Modified: nutch/branches/nutchgora/CHANGES.txt
URL:
http://svn.apache.org/viewvc/nutch/branches/nutchgora/CHANGES.txt?rev=1335065&r1=1335064&r2=1335065&view=diff
==============================================================================
--- nutch/branches/nutchgora/CHANGES.txt (original)
+++ nutch/branches/nutchgora/CHANGES.txt Mon May 7 15:30:38 2012
@@ -1,6 +1,8 @@
Nutch Change Log
Release nutchgora - Current Development
+* NUTCH-1356 ParseUtil use ExecutorService instead of manually thread
handling. (ferdy)
+
* NUTCH-1355 nutchgora Configure minimum throughput for fetcher
* NUTCH-1354 nutchgora support fetcher.queue.depth.multiplier property (ferdy)
Modified:
nutch/branches/nutchgora/src/java/org/apache/nutch/parse/ParseUtil.java
URL:
http://svn.apache.org/viewvc/nutch/branches/nutchgora/src/java/org/apache/nutch/parse/ParseUtil.java?rev=1335065&r1=1335064&r2=1335065&view=diff
==============================================================================
--- nutch/branches/nutchgora/src/java/org/apache/nutch/parse/ParseUtil.java
(original)
+++ nutch/branches/nutchgora/src/java/org/apache/nutch/parse/ParseUtil.java Mon
May 7 15:30:38 2012
@@ -20,13 +20,12 @@ package org.apache.nutch.parse;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.ByteBuffer;
-import java.util.concurrent.FutureTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import org.apache.avro.util.Utf8;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.StringUtils;
@@ -39,10 +38,13 @@ import org.apache.nutch.net.URLFilterExc
import org.apache.nutch.net.URLFilters;
import org.apache.nutch.net.URLNormalizers;
import org.apache.nutch.storage.Mark;
-import org.apache.nutch.storage.ParseStatus;
import org.apache.nutch.storage.WebPage;
import org.apache.nutch.util.TableUtil;
import org.apache.nutch.util.URLUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* A Utility class containing methods to simply perform parsing utilities such
@@ -58,8 +60,9 @@ public class ParseUtil extends Configure
/* our log stream */
public static final Logger LOG = LoggerFactory.getLogger(ParseUtil.class);
+ private static final int DEFAULT_MAX_PARSE_TIME = 30;
+
private Configuration conf;
-
private Signature sig;
private URLFilters filters;
private URLNormalizers normalizers;
@@ -67,7 +70,9 @@ public class ParseUtil extends Configure
private boolean ignoreExternalLinks;
private ParserFactory parserFactory;
/** Parser timeout set to 30 sec by default. Set -1 to deactivate **/
- private int MAX_PARSE_TIME = 30;
+ private int maxParseTime;
+ private ExecutorService executorService;
+
/**
*
* @param conf
@@ -86,13 +91,15 @@ public class ParseUtil extends Configure
public void setConf(Configuration conf) {
this.conf = conf;
parserFactory = new ParserFactory(conf);
- MAX_PARSE_TIME=conf.getInt("parser.timeout", 30);
+ maxParseTime=conf.getInt("parser.timeout", DEFAULT_MAX_PARSE_TIME);
sig = SignatureFactory.getSignature(conf);
filters = new URLFilters(conf);
normalizers = new URLNormalizers(conf, URLNormalizers.SCOPE_OUTLINK);
int maxOutlinksPerPage = conf.getInt("db.max.outlinks.per.page", 100);
maxOutlinks = (maxOutlinksPerPage < 0) ? Integer.MAX_VALUE :
maxOutlinksPerPage;
ignoreExternalLinks = conf.getBoolean("db.ignore.external.links", false);
+ executorService = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
+ .setNameFormat("parse-%d").setDaemon(true).build());
}
/**
@@ -118,7 +125,7 @@ public class ParseUtil extends Configure
}
Parse parse = null;
- if (MAX_PARSE_TIME!=-1)
+ if (maxParseTime!=-1)
parse = runParser(parsers[i], url, page);
else
parse = parsers[i].getParse(url, page);
@@ -133,25 +140,19 @@ public class ParseUtil extends Configure
return ParseStatusUtils.getEmptyParse(new ParseException("Unable to
successfully parse content"), null);
}
- private Parse runParser(Parser p, String url, WebPage page) {
- ParseCallable pc = new ParseCallable(p, page, url);
- FutureTask<Parse> task = new FutureTask<Parse>(pc);
- Parse res = null;
- Thread t = new Thread(task);
- t.start();
- try {
- res = task.get(MAX_PARSE_TIME, TimeUnit.SECONDS);
- } catch (TimeoutException e) {
- LOG.warn("TIMEOUT parsing " + url + " with " + p);
- } catch (Exception e) {
- task.cancel(true);
- res = null;
- t.interrupt();
- } finally {
- t = null;
- pc = null;
- }
- return res;
+ private Parse runParser(Parser p, String url, WebPage page) {
+ ParseCallable pc = new ParseCallable(p, page, url);
+ Future<Parse> task = executorService.submit(pc);
+ Parse res = null;
+ try {
+ res = task.get(maxParseTime, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ LOG.warn("Error parsing " + url, e);
+ task.cancel(true);
+ } finally {
+ pc = null;
+ }
+ return res;
}
/**