Author: ferdy
Date: Mon May  7 15:30:38 2012
New Revision: 1335065

URL: http://svn.apache.org/viewvc?rev=1335065&view=rev
Log:
NUTCH-1356 ParseUtil use ExecutorService instead of manually thread handling.

Modified:
    nutch/branches/nutchgora/CHANGES.txt
    nutch/branches/nutchgora/src/java/org/apache/nutch/parse/ParseUtil.java

Modified: nutch/branches/nutchgora/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/nutch/branches/nutchgora/CHANGES.txt?rev=1335065&r1=1335064&r2=1335065&view=diff
==============================================================================
--- nutch/branches/nutchgora/CHANGES.txt (original)
+++ nutch/branches/nutchgora/CHANGES.txt Mon May  7 15:30:38 2012
@@ -1,6 +1,8 @@
 Nutch Change Log
 
 Release nutchgora - Current Development
+* NUTCH-1356 ParseUtil use ExecutorService instead of manually thread 
handling. (ferdy)
+
 * NUTCH-1355 nutchgora Configure minimum throughput for fetcher
 
 * NUTCH-1354 nutchgora support fetcher.queue.depth.multiplier property (ferdy)

Modified: 
nutch/branches/nutchgora/src/java/org/apache/nutch/parse/ParseUtil.java
URL: 
http://svn.apache.org/viewvc/nutch/branches/nutchgora/src/java/org/apache/nutch/parse/ParseUtil.java?rev=1335065&r1=1335064&r2=1335065&view=diff
==============================================================================
--- nutch/branches/nutchgora/src/java/org/apache/nutch/parse/ParseUtil.java 
(original)
+++ nutch/branches/nutchgora/src/java/org/apache/nutch/parse/ParseUtil.java Mon 
May  7 15:30:38 2012
@@ -20,13 +20,12 @@ package org.apache.nutch.parse;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.nio.ByteBuffer;
-import java.util.concurrent.FutureTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 import org.apache.avro.util.Utf8;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.util.StringUtils;
@@ -39,10 +38,13 @@ import org.apache.nutch.net.URLFilterExc
 import org.apache.nutch.net.URLFilters;
 import org.apache.nutch.net.URLNormalizers;
 import org.apache.nutch.storage.Mark;
-import org.apache.nutch.storage.ParseStatus;
 import org.apache.nutch.storage.WebPage;
 import org.apache.nutch.util.TableUtil;
 import org.apache.nutch.util.URLUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
  * A Utility class containing methods to simply perform parsing utilities such
@@ -58,8 +60,9 @@ public class ParseUtil extends Configure
   /* our log stream */
   public static final Logger LOG = LoggerFactory.getLogger(ParseUtil.class);
 
+  private static final int DEFAULT_MAX_PARSE_TIME = 30;
+  
   private Configuration conf;
-
   private Signature sig;
   private URLFilters filters;
   private URLNormalizers normalizers;
@@ -67,7 +70,9 @@ public class ParseUtil extends Configure
   private boolean ignoreExternalLinks;
   private ParserFactory parserFactory;
   /** Parser timeout set to 30 sec by default. Set -1 to deactivate **/
-  private int MAX_PARSE_TIME = 30;
+  private int maxParseTime;
+  private ExecutorService executorService;
+  
   /**
    *
    * @param conf
@@ -86,13 +91,15 @@ public class ParseUtil extends Configure
   public void setConf(Configuration conf) {
     this.conf = conf;
     parserFactory = new ParserFactory(conf);
-    MAX_PARSE_TIME=conf.getInt("parser.timeout", 30);
+    maxParseTime=conf.getInt("parser.timeout", DEFAULT_MAX_PARSE_TIME);
     sig = SignatureFactory.getSignature(conf);
     filters = new URLFilters(conf);
     normalizers = new URLNormalizers(conf, URLNormalizers.SCOPE_OUTLINK);
     int maxOutlinksPerPage = conf.getInt("db.max.outlinks.per.page", 100);
     maxOutlinks = (maxOutlinksPerPage < 0) ? Integer.MAX_VALUE : 
maxOutlinksPerPage;
     ignoreExternalLinks = conf.getBoolean("db.ignore.external.links", false);
+    executorService = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
+      .setNameFormat("parse-%d").setDaemon(true).build());
   }
 
   /**
@@ -118,7 +125,7 @@ public class ParseUtil extends Configure
       }
       Parse parse = null;
       
-      if (MAX_PARSE_TIME!=-1)
+      if (maxParseTime!=-1)
          parse = runParser(parsers[i], url, page);
       else 
          parse = parsers[i].getParse(url, page);
@@ -133,25 +140,19 @@ public class ParseUtil extends Configure
     return ParseStatusUtils.getEmptyParse(new ParseException("Unable to 
successfully parse content"), null);
   }
   
-  private Parse runParser(Parser p, String url, WebPage page) {
-         ParseCallable pc = new ParseCallable(p, page, url);
-         FutureTask<Parse> task = new FutureTask<Parse>(pc);
-         Parse res = null;
-         Thread t = new Thread(task);
-         t.start();
-         try {
-                 res = task.get(MAX_PARSE_TIME, TimeUnit.SECONDS);
-         } catch (TimeoutException e) {
-                 LOG.warn("TIMEOUT parsing " + url + " with " + p);
-         } catch (Exception e) {
-                 task.cancel(true);
-                 res = null;
-                 t.interrupt();
-         } finally {
-                 t = null;
-                 pc = null;
-         }
-         return res;
+  private Parse runParser(Parser p, String url, WebPage page) {    
+    ParseCallable pc = new ParseCallable(p, page, url);
+    Future<Parse> task = executorService.submit(pc);
+    Parse res = null;
+    try {
+      res = task.get(maxParseTime, TimeUnit.SECONDS);
+    } catch (Exception e) {
+      LOG.warn("Error parsing " + url, e);
+      task.cancel(true);
+    } finally {
+      pc = null;
+    }
+    return res;
   }
 
   /**


Reply via email to