Modified: nutch/branches/2.x/src/java/org/apache/nutch/fetcher/FetcherReducer.java URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/fetcher/FetcherReducer.java?rev=1650447&r1=1650446&r2=1650447&view=diff ============================================================================== --- nutch/branches/2.x/src/java/org/apache/nutch/fetcher/FetcherReducer.java (original) +++ nutch/branches/2.x/src/java/org/apache/nutch/fetcher/FetcherReducer.java Fri Jan 9 06:34:33 2015 @@ -46,20 +46,23 @@ import java.util.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -public class FetcherReducer -extends GoraReducer<IntWritable, FetchEntry, String, WebPage> { +public class FetcherReducer extends + GoraReducer<IntWritable, FetchEntry, String, WebPage> { public static final Logger LOG = FetcherJob.LOG; private final AtomicInteger activeThreads = new AtomicInteger(0); private final AtomicInteger spinWaiting = new AtomicInteger(0); - private final long start = System.currentTimeMillis(); // start time of fetcher run + private final long start = System.currentTimeMillis(); // start time of + // fetcher run private final AtomicLong lastRequestStart = new AtomicLong(start); - private final AtomicLong bytes = new AtomicLong(0); // total bytes fetched - private final AtomicInteger pages = new AtomicInteger(0); // total pages fetched - private final AtomicInteger errors = new AtomicInteger(0); // total pages errored + private final AtomicLong bytes = new AtomicLong(0); // total bytes fetched + private final AtomicInteger pages = new AtomicInteger(0); // total pages + // fetched + private final AtomicInteger errors = new AtomicInteger(0); // total pages + // errored private QueueFeeder feeder; @@ -89,9 +92,10 @@ extends GoraReducer<IntWritable, FetchEn this.queueID = queueID; } - /** Create an item. Queue id will be created based on <code>queueMode</code> - * argument, either as a protocol + hostname pair, protocol + IP - * address pair or protocol+domain pair. + /** + * Create an item. Queue id will be created based on <code>queueMode</code> + * argument, either as a protocol + hostname pair, protocol + IP address + * pair or protocol+domain pair. */ public static FetchItem create(String url, WebPage page, String queueMode) { String queueID; @@ -113,19 +117,18 @@ extends GoraReducer<IntWritable, FetchEn LOG.warn("Unable to resolve: " + u.getHost() + ", skipping."); return null; } - } - else if (FetchItemQueues.QUEUE_MODE_DOMAIN.equalsIgnoreCase(queueMode)){ + } else if (FetchItemQueues.QUEUE_MODE_DOMAIN.equalsIgnoreCase(queueMode)) { host = URLUtil.getDomainName(u); if (host == null) { - LOG.warn("Unknown domain for url: " + url + ", using URL string as key"); - host=u.toExternalForm(); + LOG.warn("Unknown domain for url: " + url + + ", using URL string as key"); + host = u.toExternalForm(); } - } - else { + } else { host = u.getHost(); if (host == null) { LOG.warn("Unknown host for url: " + url + ", using URL string as key"); - host=u.toExternalForm(); + host = u.toExternalForm(); } } queueID = proto + "://" + host.toLowerCase(); @@ -140,19 +143,22 @@ extends GoraReducer<IntWritable, FetchEn } /** - * This class handles FetchItems which come from the same host ID (be it - * a proto/hostname or proto/IP pair). It also keeps track of requests in + * This class handles FetchItems which come from the same host ID (be it a + * proto/hostname or proto/IP pair). It also keeps track of requests in * progress and elapsed time between requests. */ private static class FetchItemQueue { - List<FetchItem> queue = Collections.synchronizedList(new LinkedList<FetchItem>()); - Set<FetchItem> inProgress = Collections.synchronizedSet(new HashSet<FetchItem>()); + List<FetchItem> queue = Collections + .synchronizedList(new LinkedList<FetchItem>()); + Set<FetchItem> inProgress = Collections + .synchronizedSet(new HashSet<FetchItem>()); AtomicLong nextFetchTime = new AtomicLong(); long crawlDelay; long minCrawlDelay; int maxThreads; - public FetchItemQueue(Configuration conf, int maxThreads, long crawlDelay, long minCrawlDelay) { + public FetchItemQueue(Configuration conf, int maxThreads, long crawlDelay, + long minCrawlDelay) { this.maxThreads = maxThreads; this.crawlDelay = crawlDelay; this.minCrawlDelay = minCrawlDelay; @@ -176,27 +182,34 @@ extends GoraReducer<IntWritable, FetchEn } public void addFetchItem(FetchItem it) { - if (it == null) return; + if (it == null) + return; queue.add(it); } @SuppressWarnings("unused") public void addInProgressFetchItem(FetchItem it) { - if (it == null) return; + if (it == null) + return; inProgress.add(it); } public FetchItem getFetchItem() { - if (inProgress.size() >= maxThreads) return null; + if (inProgress.size() >= maxThreads) + return null; final long now = System.currentTimeMillis(); - if (nextFetchTime.get() > now) return null; + if (nextFetchTime.get() > now) + return null; FetchItem it = null; - if (queue.size() == 0) return null; + if (queue.size() == 0) + return null; try { it = queue.remove(0); inProgress.add(it); } catch (final Exception e) { - LOG.error("Cannot remove FetchItem from queue or cannot add it to inProgress queue", e); + LOG.error( + "Cannot remove FetchItem from queue or cannot add it to inProgress queue", + e); } return it; } @@ -220,11 +233,12 @@ extends GoraReducer<IntWritable, FetchEn private void setEndTime(long endTime, boolean asap) { if (!asap) - nextFetchTime.set(endTime + (maxThreads > 1 ? minCrawlDelay : crawlDelay)); + nextFetchTime.set(endTime + + (maxThreads > 1 ? minCrawlDelay : crawlDelay)); else nextFetchTime.set(endTime); } - + public synchronized int emptyQueue() { int presize = queue.size(); queue.clear(); @@ -247,7 +261,7 @@ extends GoraReducer<IntWritable, FetchEn long minCrawlDelay; Configuration conf; long timelimit = -1; - + boolean useHostSettings = false; HostDb hostDb = null; @@ -260,25 +274,29 @@ extends GoraReducer<IntWritable, FetchEn this.maxThreads = conf.getInt("fetcher.threads.per.queue", 1); queueMode = conf.get("fetcher.queue.mode", QUEUE_MODE_HOST); // check that the mode is known - if (!queueMode.equals(QUEUE_MODE_IP) && !queueMode.equals(QUEUE_MODE_DOMAIN) + if (!queueMode.equals(QUEUE_MODE_IP) + && !queueMode.equals(QUEUE_MODE_DOMAIN) && !queueMode.equals(QUEUE_MODE_HOST)) { - LOG.error("Unknown partition mode : " + queueMode + " - forcing to byHost"); + LOG.error("Unknown partition mode : " + queueMode + + " - forcing to byHost"); queueMode = QUEUE_MODE_HOST; } - LOG.info("Using queue mode : "+queueMode); - - // Optionally enable host specific queue behavior + LOG.info("Using queue mode : " + queueMode); + + // Optionally enable host specific queue behavior if (queueMode.equals(QUEUE_MODE_HOST)) { - useHostSettings = conf.getBoolean("fetcher.queue.use.host.settings", false); + useHostSettings = conf.getBoolean("fetcher.queue.use.host.settings", + false); if (useHostSettings) { LOG.info("Host specific queue settings enabled."); // Initialize the HostDb if we need it. hostDb = new HostDb(conf); } } - + this.crawlDelay = (long) (conf.getFloat("fetcher.server.delay", 1.0f) * 1000); - this.minCrawlDelay = (long) (conf.getFloat("fetcher.server.min.delay", 0.0f) * 1000); + this.minCrawlDelay = (long) (conf.getFloat("fetcher.server.min.delay", + 0.0f) * 1000); this.timelimit = conf.getLong("fetcher.timelimit", -1); } @@ -292,7 +310,8 @@ extends GoraReducer<IntWritable, FetchEn public void addFetchItem(String url, WebPage page) { final FetchItem it = FetchItem.create(url, page, queueMode); - if (it != null) addFetchItem(it); + if (it != null) + addFetchItem(it); } public synchronized void addFetchItem(FetchItem it) { @@ -321,19 +340,18 @@ extends GoraReducer<IntWritable, FetchEn if (useHostSettings) { // Use host specific queue settings (if defined in the host table) try { - String hostname = id.substring(id.indexOf("://")+3); + String hostname = id.substring(id.indexOf("://") + 3); Host host = hostDb.getByHostName(hostname); if (host != null) { - fiq = new FetchItemQueue(conf, - host.getInt("q_mt", maxThreads), - host.getLong("q_cd", crawlDelay), - host.getLong("q_mcd", minCrawlDelay)); + fiq = new FetchItemQueue(conf, host.getInt("q_mt", maxThreads), + host.getLong("q_cd", crawlDelay), host.getLong("q_mcd", + minCrawlDelay)); } - + } catch (IOException e) { LOG.error("Error while trying to access host settings", e); } - } + } if (fiq == null) { // Use queue defaults fiq = new FetchItemQueue(conf, maxThreads, crawlDelay, minCrawlDelay); @@ -344,8 +362,8 @@ extends GoraReducer<IntWritable, FetchEn } public synchronized FetchItem getFetchItem() { - final Iterator<Map.Entry<String, FetchItemQueue>> it = - queues.entrySet().iterator(); + final Iterator<Map.Entry<String, FetchItemQueue>> it = queues.entrySet() + .iterator(); while (it.hasNext()) { final FetchItemQueue fiq = it.next().getValue(); // reap empty queues @@ -362,19 +380,19 @@ extends GoraReducer<IntWritable, FetchEn } return null; } - + public synchronized int checkTimelimit() { if (System.currentTimeMillis() >= timelimit && timelimit != -1) { return emptyQueues(); } return 0; } - public synchronized void dump() { for (final String id : queues.keySet()) { final FetchItemQueue fiq = queues.get(id); - if (fiq.getQueueSize() == 0) continue; + if (fiq.getQueueSize() == 0) + continue; LOG.info("* queue: " + id); fiq.dump(); } @@ -383,11 +401,12 @@ extends GoraReducer<IntWritable, FetchEn // empties the queues (used by timebomb and throughput threshold) public synchronized int emptyQueues() { int count = 0; - + // emptying the queues for (String id : queues.keySet()) { FetchItemQueue fiq = queues.get(id); - if (fiq.getQueueSize() == 0) continue; + if (fiq.getQueueSize() == 0) + continue; LOG.info("* queue: " + id + " >> dropping! "); int deleted = fiq.emptyQueue(); for (int i = 0; i < deleted; i++) { @@ -398,7 +417,8 @@ extends GoraReducer<IntWritable, FetchEn // there might also be a case where totalsize !=0 but number of queues // == 0 // in which case we simply force it to 0 to avoid blocking - if (totalSize.get() != 0 && queues.size() == 0) totalSize.set(0); + if (totalSize.get() != 0 && queues.size() == 0) + totalSize.set(0); return count; } @@ -420,8 +440,8 @@ extends GoraReducer<IntWritable, FetchEn private final boolean ignoreExternalLinks; public FetcherThread(Context context, int num) { - this.setDaemon(true); // don't hang JVM on exit - this.setName("FetcherThread" + num); // use an informative name + this.setDaemon(true); // don't hang JVM on exit + this.setName("FetcherThread" + num); // use an informative name this.context = context; Configuration conf = context.getConfiguration(); this.urlFilters = new URLFilters(conf); @@ -430,7 +450,8 @@ extends GoraReducer<IntWritable, FetchEn this.maxCrawlDelay = conf.getInt("fetcher.max.crawl.delay", 30) * 1000; // backward-compatible default setting this.byIP = conf.getBoolean("fetcher.threads.per.host.by.ip", true); - this.ignoreExternalLinks = conf.getBoolean("db.ignore.external.links", false); + this.ignoreExternalLinks = conf.getBoolean("db.ignore.external.links", + false); } @Override @@ -446,13 +467,15 @@ extends GoraReducer<IntWritable, FetchEn if (fit == null) { if (feeder.isAlive() || fetchQueues.getTotalSize() > 0) { if (LOG.isDebugEnabled()) { - LOG.debug(getName() + " fetchQueues.getFetchItem() was null, spin-waiting ..."); + LOG.debug(getName() + + " fetchQueues.getFetchItem() was null, spin-waiting ..."); } // spin-wait. spinWaiting.incrementAndGet(); try { Thread.sleep(500); - } catch (final Exception e) {} + } catch (final Exception e) { + } spinWaiting.decrementAndGet(); continue; } else { @@ -467,12 +490,13 @@ extends GoraReducer<IntWritable, FetchEn reprUrl = TableUtil.toString(fit.page.getReprUrl()); } try { - LOG.info("fetching " + fit.url + " (queue crawl delay=" + - fetchQueues.getFetchItemQueue(fit.queueID).crawlDelay + "ms)"); + LOG.info("fetching " + fit.url + " (queue crawl delay=" + + fetchQueues.getFetchItemQueue(fit.queueID).crawlDelay + "ms)"); // fetch the page final Protocol protocol = this.protocolFactory.getProtocol(fit.url); - final BaseRobotRules rules = protocol.getRobotRules(fit.url, fit.page); + final BaseRobotRules rules = protocol.getRobotRules(fit.url, + fit.page); if (!rules.isAllowed(fit.u.toString())) { // unblock fetchQueues.finishFetchItem(fit, true); @@ -487,41 +511,49 @@ extends GoraReducer<IntWritable, FetchEn if (rules.getCrawlDelay() > maxCrawlDelay && maxCrawlDelay >= 0) { // unblock fetchQueues.finishFetchItem(fit, true); - LOG.debug("Crawl-Delay for " + fit.url + " too long (" + rules.getCrawlDelay() + "), skipping"); - output(fit, null, ProtocolStatusUtils.STATUS_ROBOTS_DENIED, CrawlStatus.STATUS_GONE); + LOG.debug("Crawl-Delay for " + fit.url + " too long (" + + rules.getCrawlDelay() + "), skipping"); + output(fit, null, ProtocolStatusUtils.STATUS_ROBOTS_DENIED, + CrawlStatus.STATUS_GONE); continue; } else { - final FetchItemQueue fiq = fetchQueues.getFetchItemQueue(fit.queueID); + final FetchItemQueue fiq = fetchQueues + .getFetchItemQueue(fit.queueID); fiq.crawlDelay = rules.getCrawlDelay(); if (LOG.isDebugEnabled()) { - LOG.info("Crawl delay for queue: " + fit.queueID + " is set to " + fiq.crawlDelay + " as per robots.txt. url: " + fit.url); + LOG.info("Crawl delay for queue: " + fit.queueID + + " is set to " + fiq.crawlDelay + + " as per robots.txt. url: " + fit.url); } } } - final ProtocolOutput output = protocol.getProtocolOutput(fit.url, fit.page); + final ProtocolOutput output = protocol.getProtocolOutput(fit.url, + fit.page); final ProtocolStatus status = output.getStatus(); final Content content = output.getContent(); // unblock queue fetchQueues.finishFetchItem(fit); - context.getCounter("FetcherStatus", ProtocolStatusUtils.getName(status.getCode())).increment(1); + context.getCounter("FetcherStatus", + ProtocolStatusUtils.getName(status.getCode())).increment(1); int length = 0; - if (content!=null && content.getContent()!=null) length= content.getContent().length; + if (content != null && content.getContent() != null) + length = content.getContent().length; updateStatus(length); - switch(status.getCode()) { + switch (status.getCode()) { case ProtocolStatusCodes.WOULDBLOCK: // retry ? fetchQueues.addFetchItem(fit); break; - case ProtocolStatusCodes.SUCCESS: // got a page + case ProtocolStatusCodes.SUCCESS: // got a page output(fit, content, status, CrawlStatus.STATUS_FETCHED); break; - case ProtocolStatusCodes.MOVED: // redirect + case ProtocolStatusCodes.MOVED: // redirect case ProtocolStatusCodes.TEMP_MOVED: byte code; boolean temp; @@ -533,18 +565,19 @@ extends GoraReducer<IntWritable, FetchEn temp = true; } final String newUrl = ProtocolStatusUtils.getMessage(status); - handleRedirect(fit.url, newUrl, temp, FetcherJob.PROTOCOL_REDIR, fit.page); + handleRedirect(fit.url, newUrl, temp, FetcherJob.PROTOCOL_REDIR, + fit.page); output(fit, content, status, code); break; case ProtocolStatusCodes.EXCEPTION: logFetchFailure(fit.url, ProtocolStatusUtils.getMessage(status)); /* FALLTHROUGH */ - case ProtocolStatusCodes.RETRY: // retry + case ProtocolStatusCodes.RETRY: // retry case ProtocolStatusCodes.BLOCKED: output(fit, null, status, CrawlStatus.STATUS_RETRY); break; - case ProtocolStatusCodes.GONE: // gone + case ProtocolStatusCodes.GONE: // gone case ProtocolStatusCodes.NOTFOUND: case ProtocolStatusCodes.ACCESS_DENIED: case ProtocolStatusCodes.ROBOTS_DENIED: @@ -562,7 +595,7 @@ extends GoraReducer<IntWritable, FetchEn output(fit, null, status, CrawlStatus.STATUS_RETRY); } - } catch (final Throwable t) { // unexpected exception + } catch (final Throwable t) { // unexpected exception // unblock fetchQueues.finishFetchItem(fit); LOG.error("Unexpected error for " + fit.url, t); @@ -574,15 +607,17 @@ extends GoraReducer<IntWritable, FetchEn } catch (final Throwable e) { LOG.error("fetcher throwable caught", e); } finally { - if (fit != null) fetchQueues.finishFetchItem(fit); + if (fit != null) + fetchQueues.finishFetchItem(fit); activeThreads.decrementAndGet(); // count threads - LOG.info("-finishing thread " + getName() + ", activeThreads=" + activeThreads); + LOG.info("-finishing thread " + getName() + ", activeThreads=" + + activeThreads); } } - private void handleRedirect(String url, String newUrl, - boolean temp, String redirType, WebPage page) - throws URLFilterException, IOException, InterruptedException { + private void handleRedirect(String url, String newUrl, boolean temp, + String redirType, WebPage page) throws URLFilterException, IOException, + InterruptedException { newUrl = normalizers.normalize(newUrl, URLNormalizers.SCOPE_FETCHER); newUrl = urlFilters.filter(newUrl); if (newUrl == null || newUrl.equals(url)) { @@ -590,7 +625,7 @@ extends GoraReducer<IntWritable, FetchEn } if (ignoreExternalLinks) { - String toHost = new URL(newUrl).getHost().toLowerCase(); + String toHost = new URL(newUrl).getHost().toLowerCase(); String fromHost = new URL(url).getHost().toLowerCase(); if (toHost == null || !toHost.equals(fromHost)) { // external links @@ -606,22 +641,20 @@ extends GoraReducer<IntWritable, FetchEn } else { page.setReprUrl(new Utf8(reprUrl)); if (LOG.isDebugEnabled()) { - LOG.debug(" - " + redirType + " redirect to " + - reprUrl + " (fetching later)"); + LOG.debug(" - " + redirType + " redirect to " + reprUrl + + " (fetching later)"); } } } - private void updateStatus(int bytesInPage) throws IOException { pages.incrementAndGet(); bytes.addAndGet(bytesInPage); } - private void output(FetchItem fit, Content content, - ProtocolStatus pstatus, byte status) - throws IOException, InterruptedException { - fit.page.setStatus((int)status); + private void output(FetchItem fit, Content content, ProtocolStatus pstatus, + byte status) throws IOException, InterruptedException { + fit.page.setStatus((int) status); final long prevFetchTime = fit.page.getFetchTime(); fit.page.setPrevFetchTime(prevFetchTime); fit.page.setFetchTime(System.currentTimeMillis()); @@ -638,13 +671,15 @@ extends GoraReducer<IntWritable, FetchEn String key = TableUtil.reverseUrl(fit.url); if (parse) { - if (!skipTruncated || (skipTruncated && !ParserJob.isTruncated(fit.url, fit.page))) { + if (!skipTruncated + || (skipTruncated && !ParserJob.isTruncated(fit.url, fit.page))) { parseUtil.process(key, fit.page); } } - //remove content if storingContent is false. Content is added to fit.page above - //for ParseUtil be able to parse it. - if(content != null && !storingContent){ + // remove content if storingContent is false. Content is added to fit.page + // above + // for ParseUtil be able to parse it. + if (content != null && !storingContent) { fit.page.setContent(ByteBuffer.wrap(new byte[0])); } context.write(key, fit.page); @@ -656,10 +691,9 @@ extends GoraReducer<IntWritable, FetchEn } } - /** - * This class feeds the queues with input items, and re-fills them as - * items are consumed by FetcherThread-s. + * This class feeds the queues with input items, and re-fills them as items + * are consumed by FetcherThread-s. */ private static class QueueFeeder extends Thread { private final Context context; @@ -669,9 +703,8 @@ extends GoraReducer<IntWritable, FetchEn boolean hasMore; private long timelimit = -1; - public QueueFeeder(Context context, - FetchItemQueues queues, int size) - throws IOException, InterruptedException { + public QueueFeeder(Context context, FetchItemQueues queues, int size) + throws IOException, InterruptedException { this.context = context; this.queues = queues; this.size = size; @@ -681,8 +714,9 @@ extends GoraReducer<IntWritable, FetchEn if (hasMore) { currentIter = context.getValues().iterator(); } - // the value of the time limit is either -1 or the time where it should finish - timelimit = context.getConfiguration().getLong("fetcher.timelimit", -1); + // the value of the time limit is either -1 or the time where it should + // finish + timelimit = context.getConfiguration().getLong("fetcher.timelimit", -1); } @Override @@ -709,7 +743,9 @@ extends GoraReducer<IntWritable, FetchEn // queues are full - spin-wait until they have some free space try { Thread.sleep(1000); - } catch (final Exception e) {}; + } catch (final Exception e) { + } + ; continue; } if (LOG.isDebugEnabled()) { @@ -717,8 +753,7 @@ extends GoraReducer<IntWritable, FetchEn } while (feed > 0 && currentIter.hasNext()) { FetchEntry entry = currentIter.next(); - final String url = - TableUtil.unreverseUrl(entry.getKey()); + final String url = TableUtil.unreverseUrl(entry.getKey()); queues.addFetchItem(url, entry.getWebPage()); feed--; cnt++; @@ -735,22 +770,27 @@ extends GoraReducer<IntWritable, FetchEn LOG.error("QueueFeeder error reading input, record " + cnt, e); return; } - LOG.info("QueueFeeder finished: total " + cnt + " records. Hit by time limit :" - + timelimitcount); - context.getCounter("FetcherStatus","HitByTimeLimit-QueueFeeder").increment(timelimitcount); + LOG.info("QueueFeeder finished: total " + cnt + + " records. Hit by time limit :" + timelimitcount); + context.getCounter("FetcherStatus", "HitByTimeLimit-QueueFeeder") + .increment(timelimitcount); } } - private void reportAndLogStatus(Context context, float actualPages, + private void reportAndLogStatus(Context context, float actualPages, int actualBytes, int totalSize) throws IOException { StringBuilder status = new StringBuilder(); - long elapsed = (System.currentTimeMillis() - start)/1000; - status.append(spinWaiting).append("/").append(activeThreads).append(" spinwaiting/active, "); + long elapsed = (System.currentTimeMillis() - start) / 1000; + status.append(spinWaiting).append("/").append(activeThreads) + .append(" spinwaiting/active, "); status.append(pages).append(" pages, ").append(errors).append(" errors, "); - status.append(Math.round((((float)pages.get())*10)/elapsed)/10.0).append(" "); - status.append(Math.round((actualPages*10)/10.0)).append(" pages/s, "); - status.append(Math.round((((float)bytes.get())*8)/1024)/elapsed).append(" "); - status.append(Math.round(((float)actualBytes)*8)/1024).append(" kb/s, "); + status.append(Math.round((((float) pages.get()) * 10) / elapsed) / 10.0) + .append(" "); + status.append(Math.round((actualPages * 10) / 10.0)).append(" pages/s, "); + status.append(Math.round((((float) bytes.get()) * 8) / 1024) / elapsed) + .append(" "); + status.append(Math.round(((float) actualBytes) * 8) / 1024).append( + " kb/s, "); status.append(totalSize).append(" URLs in "); status.append(this.fetchQueues.getQueueCount()).append(" queues"); String toString = status.toString(); @@ -759,30 +799,30 @@ extends GoraReducer<IntWritable, FetchEn } @Override - public void run(Context context) - throws IOException, InterruptedException { + public void run(Context context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); this.fetchQueues = new FetchItemQueues(conf); int threadCount = conf.getInt("fetcher.threads.fetch", 10); parse = conf.getBoolean(FetcherJob.PARSE_KEY, false); - storingContent=conf.getBoolean("fetcher.store.content", true); + storingContent = conf.getBoolean("fetcher.store.content", true); if (parse) { - skipTruncated=conf.getBoolean(ParserJob.SKIP_TRUNCATED, true); + skipTruncated = conf.getBoolean(ParserJob.SKIP_TRUNCATED, true); parseUtil = new ParseUtil(conf); } LOG.info("Fetcher: threads: " + threadCount); int maxFeedPerThread = conf.getInt("fetcher.queue.depth.multiplier", 50); - feeder = new QueueFeeder(context, fetchQueues, threadCount * maxFeedPerThread); + feeder = new QueueFeeder(context, fetchQueues, threadCount + * maxFeedPerThread); feeder.start(); - for (int i = 0; i < threadCount; i++) { // spawn threads + for (int i = 0; i < threadCount; i++) { // spawn threads FetcherThread ft = new FetcherThread(context, i); fetcherThreads.add(ft); ft.start(); } // select a timeout that avoids a task timeout - final long timeout = conf.getInt("mapred.task.timeout", 10*60*1000)/2; + final long timeout = conf.getInt("mapred.task.timeout", 10 * 60 * 1000) / 2; // Used for threshold check, holds pages and bytes processed in the last sec float pagesLastSec; @@ -790,48 +830,59 @@ extends GoraReducer<IntWritable, FetchEn int throughputThresholdCurrentSequence = 0; - int throughputThresholdPages = conf.getInt("fetcher.throughput.threshold.pages", -1); - if (LOG.isInfoEnabled()) { LOG.info("Fetcher: throughput threshold: " + throughputThresholdPages); } - int throughputThresholdSequence = conf.getInt("fetcher.throughput.threshold.sequence", 5); - if (LOG.isInfoEnabled()) { - LOG.info("Fetcher: throughput threshold sequence: " + throughputThresholdSequence); - } - long throughputThresholdTimeLimit = conf.getLong("fetcher.throughput.threshold.check.after", -1); - - do { // wait for threads to exit + int throughputThresholdPages = conf.getInt( + "fetcher.throughput.threshold.pages", -1); + if (LOG.isInfoEnabled()) { + LOG.info("Fetcher: throughput threshold: " + throughputThresholdPages); + } + int throughputThresholdSequence = conf.getInt( + "fetcher.throughput.threshold.sequence", 5); + if (LOG.isInfoEnabled()) { + LOG.info("Fetcher: throughput threshold sequence: " + + throughputThresholdSequence); + } + long throughputThresholdTimeLimit = conf.getLong( + "fetcher.throughput.threshold.check.after", -1); + + do { // wait for threads to exit pagesLastSec = pages.get(); - bytesLastSec = (int)bytes.get(); + bytesLastSec = (int) bytes.get(); final int secondsToSleep = 5; try { Thread.sleep(secondsToSleep * 1000); - } catch (InterruptedException e) {} + } catch (InterruptedException e) { + } - pagesLastSec = (pages.get() - pagesLastSec)/secondsToSleep; - bytesLastSec = ((int)bytes.get() - bytesLastSec)/secondsToSleep; + pagesLastSec = (pages.get() - pagesLastSec) / secondsToSleep; + bytesLastSec = ((int) bytes.get() - bytesLastSec) / secondsToSleep; int fetchQueuesTotalSize = fetchQueues.getTotalSize(); - reportAndLogStatus(context, pagesLastSec, bytesLastSec, fetchQueuesTotalSize); - + reportAndLogStatus(context, pagesLastSec, bytesLastSec, + fetchQueuesTotalSize); + boolean feederAlive = feeder.isAlive(); if (!feederAlive && fetchQueuesTotalSize < 5) { fetchQueues.dump(); } - + // check timelimit if (!feederAlive) { int hitByTimeLimit = fetchQueues.checkTimelimit(); if (hitByTimeLimit != 0) { - context.getCounter("FetcherStatus","HitByTimeLimit-Queues").increment(hitByTimeLimit); + context.getCounter("FetcherStatus", "HitByTimeLimit-Queues") + .increment(hitByTimeLimit); } } - + // if throughput threshold is enabled - if (throughputThresholdTimeLimit < System.currentTimeMillis() && throughputThresholdPages != -1) { + if (throughputThresholdTimeLimit < System.currentTimeMillis() + && throughputThresholdPages != -1) { // Check if we're dropping below the threshold if (pagesLastSec < throughputThresholdPages) { throughputThresholdCurrentSequence++; - LOG.warn(Integer.toString(throughputThresholdCurrentSequence) - + ": dropping below configured threshold of " + Integer.toString(throughputThresholdPages) + LOG.warn(Integer.toString(throughputThresholdCurrentSequence) + + ": dropping below configured threshold of " + + Integer.toString(throughputThresholdPages) + " pages per second"); // Quit if we dropped below threshold too many times @@ -841,17 +892,19 @@ extends GoraReducer<IntWritable, FetchEn // Disable the threshold checker throughputThresholdPages = -1; - // Empty the queues cleanly and get number of items that were dropped + // Empty the queues cleanly and get number of items that were + // dropped int hitByThrougputThreshold = fetchQueues.emptyQueues(); - if (hitByThrougputThreshold != 0) context.getCounter("FetcherStatus", - "hitByThrougputThreshold").increment(hitByThrougputThreshold); + if (hitByThrougputThreshold != 0) + context.getCounter("FetcherStatus", "hitByThrougputThreshold") + .increment(hitByThrougputThreshold); } } else { throughputThresholdCurrentSequence = 0; } } - + // some requests seem to hang, despite all intentions if ((System.currentTimeMillis() - lastRequestStart.get()) > timeout) { if (LOG.isWarnEnabled() && activeThreads.get() > 0) { @@ -859,7 +912,8 @@ extends GoraReducer<IntWritable, FetchEn for (int i = 0; i < fetcherThreads.size(); i++) { FetcherThread thread = fetcherThreads.get(i); if (thread.isAlive()) { - LOG.warn("Thread #" + i + " hung while processing " + thread.reprUrl); + LOG.warn("Thread #" + i + " hung while processing " + + thread.reprUrl); if (LOG.isDebugEnabled()) { StackTraceElement[] stack = thread.getStackTrace(); StringBuilder sb = new StringBuilder(); @@ -879,4 +933,3 @@ extends GoraReducer<IntWritable, FetchEn LOG.info("-activeThreads=" + activeThreads); } } -
Modified: nutch/branches/2.x/src/java/org/apache/nutch/host/HostDb.java URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/host/HostDb.java?rev=1650447&r1=1650446&r2=1650447&view=diff ============================================================================== --- nutch/branches/2.x/src/java/org/apache/nutch/host/HostDb.java (original) +++ nutch/branches/2.x/src/java/org/apache/nutch/host/HostDb.java Fri Jan 9 06:34:33 2015 @@ -37,21 +37,22 @@ import com.google.common.cache.RemovalLi import com.google.common.cache.RemovalNotification; /** - * A caching wrapper for the host datastore. + * A caching wrapper for the host datastore. */ public class HostDb implements Closeable { public static final Log LOG = LogFactory.getLog(HostDb.class); - + private static final class CacheHost { private final Host host; private final long timestamp; + public CacheHost(Host host, long timestamp) { this.host = host; this.timestamp = timestamp; - } + } } - private final static CacheHost NULL_HOST = new CacheHost(null,0); - + + private final static CacheHost NULL_HOST = new CacheHost(null, 0); private DataStore<String, Host> hostStore; @@ -61,7 +62,7 @@ public class HostDb implements Closeable public static final int DEFAULT_HOSTDB_CONCURRENCY_LEVEL = 8; private Cache<String, CacheHost> cache; - + private AtomicLong lastFlush; public HostDb(Configuration conf) throws GoraException { @@ -73,47 +74,43 @@ public class HostDb implements Closeable // Create a cache. // We add a removal listener to see if we need to flush the store, - // in order to adhere to the put-flush-get semantic + // in order to adhere to the put-flush-get semantic // ("read your own write") of DataStore. - + long lruSize = conf.getLong(HOSTDB_LRU_SIZE, DEFAULT_LRU_SIZE); - int concurrencyLevel = conf.getInt(HOSTDB_CONCURRENCY_LEVEL, + int concurrencyLevel = conf.getInt(HOSTDB_CONCURRENCY_LEVEL, DEFAULT_HOSTDB_CONCURRENCY_LEVEL); - RemovalListener<String, CacheHost> listener = - new RemovalListener<String, CacheHost>() { - @Override - public void onRemoval( - RemovalNotification<String, CacheHost> notification) { - CacheHost removeFromCacheHost = notification.getValue(); - if (removeFromCacheHost != NULL_HOST) { - if (removeFromCacheHost.timestamp < lastFlush.get()) { - try { - hostStore.flush(); - } catch (Exception e) { - throw new RuntimeException(e); - } - lastFlush.set(System.currentTimeMillis()); - } + RemovalListener<String, CacheHost> listener = new RemovalListener<String, CacheHost>() { + @Override + public void onRemoval(RemovalNotification<String, CacheHost> notification) { + CacheHost removeFromCacheHost = notification.getValue(); + if (removeFromCacheHost != NULL_HOST) { + if (removeFromCacheHost.timestamp < lastFlush.get()) { + try { + hostStore.flush(); + } catch (Exception e) { + throw new RuntimeException(e); } + lastFlush.set(System.currentTimeMillis()); } + } + } }; - - cache=CacheBuilder.newBuilder().maximumSize(lruSize) - .removalListener(listener).concurrencyLevel(concurrencyLevel) - .build(); + + cache = CacheBuilder.newBuilder().maximumSize(lruSize) + .removalListener(listener).concurrencyLevel(concurrencyLevel).build(); lastFlush = new AtomicLong(System.currentTimeMillis()); } - - public Host get(final String key) throws IOException { Callable<CacheHost> valueLoader = new Callable<CacheHost>() { @Override public CacheHost call() throws Exception { Host host = hostStore.get(key); - if (host == null) return NULL_HOST; + if (host == null) + return NULL_HOST; return new CacheHost(host, System.currentTimeMillis()); - } + } }; CacheHost cachedHost; try { @@ -127,14 +124,11 @@ public class HostDb implements Closeable return null; } } - - public Host getByHostName(String hostName) throws IOException { - return get(TableUtil.reverseHost(hostName)); + return get(TableUtil.reverseHost(hostName)); } - - + public void put(String key, Host host) throws IOException { cache.put(key, new CacheHost(host, System.currentTimeMillis())); hostStore.put(key, host); Modified: nutch/branches/2.x/src/java/org/apache/nutch/host/HostDbReader.java URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/host/HostDbReader.java?rev=1650447&r1=1650446&r2=1650447&view=diff ============================================================================== --- nutch/branches/2.x/src/java/org/apache/nutch/host/HostDbReader.java (original) +++ nutch/branches/2.x/src/java/org/apache/nutch/host/HostDbReader.java Fri Jan 9 06:34:33 2015 @@ -39,7 +39,8 @@ import org.apache.nutch.util.TableUtil; public class HostDbReader extends Configured implements Tool { public static final Log LOG = LogFactory.getLog(HostDbReader.class); - private void read(String key) throws ClassNotFoundException, IOException, Exception { + private void read(String key) throws ClassNotFoundException, IOException, + Exception { DataStore<String, Host> datastore = StorageUtils.createWebStore(getConf(), String.class, Host.class); Modified: nutch/branches/2.x/src/java/org/apache/nutch/host/HostDbUpdateJob.java URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/host/HostDbUpdateJob.java?rev=1650447&r1=1650446&r2=1650447&view=diff ============================================================================== --- nutch/branches/2.x/src/java/org/apache/nutch/host/HostDbUpdateJob.java (original) +++ nutch/branches/2.x/src/java/org/apache/nutch/host/HostDbUpdateJob.java Fri Jan 9 06:34:33 2015 @@ -116,15 +116,14 @@ public class HostDbUpdateJob implements @Override public int run(String[] args) throws Exception { - boolean linkDb=false; + boolean linkDb = false; for (int i = 0; i < args.length; i++) { if ("-linkDb".equals(args[i])) { linkDb = true; } else if ("-crawlId".equals(args[i])) { getConf().set(Nutch.CRAWL_ID_KEY, args[++i]); - } - else { - throw new IllegalArgumentException("unrecognized arg " + args[i] + } else { + throw new IllegalArgumentException("unrecognized arg " + args[i] + " usage: (-linkDb) (-crawlId <crawlId>)"); } } Modified: nutch/branches/2.x/src/java/org/apache/nutch/host/HostDbUpdateReducer.java URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/host/HostDbUpdateReducer.java?rev=1650447&r1=1650446&r2=1650447&view=diff ============================================================================== --- nutch/branches/2.x/src/java/org/apache/nutch/host/HostDbUpdateReducer.java (original) +++ nutch/branches/2.x/src/java/org/apache/nutch/host/HostDbUpdateReducer.java Fri Jan 9 06:34:33 2015 @@ -30,63 +30,68 @@ import java.nio.ByteBuffer; import java.util.Set; /** - * Combines all WebPages with the same host key to create a Host object, - * with some statistics. + * Combines all WebPages with the same host key to create a Host object, with + * some statistics. */ -public class HostDbUpdateReducer extends GoraReducer<Text, WebPage, String, Host> { - +public class HostDbUpdateReducer extends + GoraReducer<Text, WebPage, String, Host> { + @Override protected void reduce(Text key, Iterable<WebPage> values, Context context) - throws IOException, InterruptedException { - + throws IOException, InterruptedException { + int numPages = 0; int numFetched = 0; boolean buildLinkDb = true; - + Histogram<String> inlinkCount = new Histogram<String>(); Histogram<String> outlinkCount = new Histogram<String>(); - - for (WebPage page: values) { + + for (WebPage page : values) { // count number of pages - numPages++; + numPages++; // count number of fetched pages if (page.getStatus() == CrawlStatus.STATUS_FETCHED) { numFetched++; } - + // build host link db // TODO: limit number of links if (buildLinkDb) { if (page.getInlinks() != null) { Set<CharSequence> inlinks = page.getInlinks().keySet(); - for (CharSequence inlink: inlinks) { + for (CharSequence inlink : inlinks) { String host = URLUtil.getHost(inlink.toString()); inlinkCount.add(host); } } if (page.getOutlinks() != null) { Set<CharSequence> outlinks = page.getOutlinks().keySet(); - for (CharSequence outlink: outlinks) { + for (CharSequence outlink : outlinks) { String host = URLUtil.getHost(outlink.toString()); outlinkCount.add(host); } } } } - + // output host data Host host = new Host(); - host.getMetadata().put(new Utf8("p"),ByteBuffer.wrap(Integer.toString(numPages).getBytes())); + host.getMetadata().put(new Utf8("p"), + ByteBuffer.wrap(Integer.toString(numPages).getBytes())); if (numFetched > 0) { - host.getMetadata().put(new Utf8("f"),ByteBuffer.wrap(Integer.toString(numFetched).getBytes())); + host.getMetadata().put(new Utf8("f"), + ByteBuffer.wrap(Integer.toString(numFetched).getBytes())); } - for (String inlink: inlinkCount.getKeys()) { - host.getInlinks().put(new Utf8(inlink), new Utf8(Integer.toString(inlinkCount.getCount(inlink)))); + for (String inlink : inlinkCount.getKeys()) { + host.getInlinks().put(new Utf8(inlink), + new Utf8(Integer.toString(inlinkCount.getCount(inlink)))); } - for (String outlink: outlinkCount.getKeys()) { - host.getOutlinks().put(new Utf8(outlink), new Utf8(Integer.toString(outlinkCount.getCount(outlink)))); + for (String outlink : outlinkCount.getKeys()) { + host.getOutlinks().put(new Utf8(outlink), + new Utf8(Integer.toString(outlinkCount.getCount(outlink)))); } - + context.write(key.toString(), host); } } Modified: nutch/branches/2.x/src/java/org/apache/nutch/host/HostInjectorJob.java URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/host/HostInjectorJob.java?rev=1650447&r1=1650446&r2=1650447&view=diff ============================================================================== --- nutch/branches/2.x/src/java/org/apache/nutch/host/HostInjectorJob.java (original) +++ nutch/branches/2.x/src/java/org/apache/nutch/host/HostInjectorJob.java Fri Jan 9 06:34:33 2015 @@ -123,13 +123,14 @@ public class HostInjectorJob implements while (keysIter.hasNext()) { String keymd = keysIter.next(); String valuemd = metadata.get(keymd); - host.getMetadata().put(new Utf8(keymd), ByteBuffer.wrap(valuemd.getBytes())); + host.getMetadata().put(new Utf8(keymd), + ByteBuffer.wrap(valuemd.getBytes())); } String hostname; - if (url.indexOf("://")> -1) { - hostname=new URL(url).getHost(); + if (url.indexOf("://") > -1) { + hostname = new URL(url).getHost(); } else { - hostname=new URL("http://"+url).getHost(); + hostname = new URL("http://" + url).getHost(); } String hostkey = TableUtil.reverseHost(hostname); context.write(hostkey, host); @@ -145,8 +146,8 @@ public class HostInjectorJob implements job.setMapOutputKeyClass(String.class); job.setMapOutputValueClass(Host.class); job.setOutputFormatClass(GoraOutputFormat.class); - GoraOutputFormat.setOutput(job, - StorageUtils.createWebStore(job.getConfiguration(), String.class, Host.class), true); + GoraOutputFormat.setOutput(job, StorageUtils.createWebStore( + job.getConfiguration(), String.class, Host.class), true); job.setReducerClass(Reducer.class); job.setNumReduceTasks(0); return job.waitForCompletion(true); Modified: nutch/branches/2.x/src/java/org/apache/nutch/host/package-info.java URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/host/package-info.java?rev=1650447&r1=1650446&r2=1650447&view=diff ============================================================================== --- nutch/branches/2.x/src/java/org/apache/nutch/host/package-info.java (original) +++ nutch/branches/2.x/src/java/org/apache/nutch/host/package-info.java Fri Jan 9 06:34:33 2015 @@ -19,3 +19,4 @@ * Host database to store metadata per host. */ package org.apache.nutch.host; + Modified: nutch/branches/2.x/src/java/org/apache/nutch/indexer/CleaningJob.java URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/indexer/CleaningJob.java?rev=1650447&r1=1650446&r2=1650447&view=diff ============================================================================== --- nutch/branches/2.x/src/java/org/apache/nutch/indexer/CleaningJob.java (original) +++ nutch/branches/2.x/src/java/org/apache/nutch/indexer/CleaningJob.java Fri Jan 9 06:34:33 2015 @@ -44,26 +44,25 @@ import org.apache.nutch.util.ToolUtil; public class CleaningJob extends NutchTool implements Tool { public static final String ARG_COMMIT = "commit"; - public static final Logger LOG = LoggerFactory - .getLogger(CleaningJob.class); + public static final Logger LOG = LoggerFactory.getLogger(CleaningJob.class); private Configuration conf; private static final Collection<WebPage.Field> FIELDS = new HashSet<WebPage.Field>(); - + static { FIELDS.add(WebPage.Field.STATUS); } - + @Override public Configuration getConf() { return conf; } - + @Override public void setConf(Configuration conf) { this.conf = conf; } - + public Collection<WebPage.Field> getFields(Job job) { Configuration conf = job.getConfiguration(); Collection<WebPage.Field> columns = new HashSet<WebPage.Field>(FIELDS); @@ -96,7 +95,7 @@ public class CleaningJob extends NutchTo } } } - + public static class CleanReducer extends Reducer<String, WebPage, NullWritable, NullWritable> { private int numDeletes = 0; @@ -128,12 +127,11 @@ public class CleaningJob extends NutchTo writers.close(); if (numDeletes > 0 && commit) { writers.commit(); - } + } LOG.info("CleaningJob: deleted a total of " + numDeletes + " documents"); } } - @Override public Map<String, Object> run(Map<String, Object> args) throws Exception { getConf().setBoolean(ARG_COMMIT, (Boolean) args.get(ARG_COMMIT)); Modified: nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexCleaningFilter.java URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexCleaningFilter.java?rev=1650447&r1=1650446&r2=1650447&view=diff ============================================================================== --- nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexCleaningFilter.java (original) +++ nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexCleaningFilter.java Fri Jan 9 06:34:33 2015 @@ -22,21 +22,21 @@ import org.apache.hadoop.conf.Configurab import org.apache.nutch.plugin.FieldPluggable; import org.apache.nutch.storage.WebPage; - -/** Extension point for indexing. Permits one to add metadata to the indexed - * fields. All plugins found which implement this extension point are run +/** + * Extension point for indexing. Permits one to add metadata to the indexed + * fields. All plugins found which implement this extension point are run * sequentially on the parse. */ public interface IndexCleaningFilter extends FieldPluggable, Configurable { /** The name of the extension point. */ final static String X_POINT_ID = IndexCleaningFilter.class.getName(); - /** - * @param url page url + /** + * @param url + * page url * @param page * @return true == remove false == keep * @throws IndexingException */ - boolean remove(String url, WebPage page) - throws IndexingException; + boolean remove(String url, WebPage page) throws IndexingException; } Modified: nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexCleaningFilters.java URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexCleaningFilters.java?rev=1650447&r1=1650446&r2=1650447&view=diff ============================================================================== --- nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexCleaningFilters.java (original) +++ nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexCleaningFilters.java Fri Jan 9 06:34:33 2015 @@ -32,12 +32,13 @@ import org.apache.nutch.plugin.PluginRun import org.apache.nutch.storage.WebPage; import org.apache.nutch.util.ObjectCache; -/** Creates and caches {@link IndexCleaningFilter} implementing plugins.*/ +/** Creates and caches {@link IndexCleaningFilter} implementing plugins. */ public class IndexCleaningFilters { public static final String IndexCleaningFilter_ORDER = "IndexCleaningFilterhbase.order"; - public final static Logger LOG = LoggerFactory.getLogger(IndexCleaningFilters.class); + public final static Logger LOG = LoggerFactory + .getLogger(IndexCleaningFilters.class); private IndexCleaningFilter[] indexcleaningFilters; @@ -60,10 +61,10 @@ public class IndexCleaningFilters { ExtensionPoint point = PluginRepository.get(conf).getExtensionPoint( IndexCleaningFilter.X_POINT_ID); if (point == null) - throw new RuntimeException(IndexCleaningFilter.X_POINT_ID + " not found."); + throw new RuntimeException(IndexCleaningFilter.X_POINT_ID + + " not found."); Extension[] extensions = point.getExtensions(); - HashMap<String, IndexCleaningFilter> filterMap = - new HashMap<String, IndexCleaningFilter>(); + HashMap<String, IndexCleaningFilter> filterMap = new HashMap<String, IndexCleaningFilter>(); for (int i = 0; i < extensions.length; i++) { Extension extension = extensions[i]; IndexCleaningFilter filter = (IndexCleaningFilter) extension @@ -78,20 +79,19 @@ public class IndexCleaningFilters { * indeterminate order */ if (orderedFilters == null) { - objectCache.setObject(IndexCleaningFilter.class.getName(), - filterMap.values().toArray( - new IndexCleaningFilter[0])); + objectCache.setObject(IndexCleaningFilter.class.getName(), filterMap + .values().toArray(new IndexCleaningFilter[0])); /* Otherwise run the filters in the required order */ } else { ArrayList<IndexCleaningFilter> filters = new ArrayList<IndexCleaningFilter>(); for (int i = 0; i < orderedFilters.length; i++) { - IndexCleaningFilter filter = filterMap.get(orderedFilters[i]); + IndexCleaningFilter filter = filterMap.get(orderedFilters[i]); if (filter != null) { filters.add(filter); } } - objectCache.setObject(IndexCleaningFilter.class.getName(), filters - .toArray(new IndexCleaningFilter[filters.size()])); + objectCache.setObject(IndexCleaningFilter.class.getName(), + filters.toArray(new IndexCleaningFilter[filters.size()])); } } catch (PluginRuntimeException e) { throw new RuntimeException(e); @@ -100,13 +100,13 @@ public class IndexCleaningFilters { .getObject(IndexCleaningFilter.class.getName()); } } + /** Run all defined filters. */ - public boolean remove(String url, WebPage page) - throws IndexingException { + public boolean remove(String url, WebPage page) throws IndexingException { for (IndexCleaningFilter indexcleaningFilter : indexcleaningFilters) { - if(indexcleaningFilter.remove(url,page)){ - return true; - } + if (indexcleaningFilter.remove(url, page)) { + return true; + } } return false; } Modified: nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexUtil.java URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexUtil.java?rev=1650447&r1=1650446&r2=1650447&view=diff ============================================================================== --- nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexUtil.java (original) +++ nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexUtil.java Fri Jan 9 06:34:33 2015 @@ -26,37 +26,41 @@ import org.apache.nutch.util.StringUtil; import org.apache.nutch.util.TableUtil; /** - * Utility to create an indexed document from a webpage. - * + * Utility to create an indexed document from a webpage. + * */ public class IndexUtil { private static final Log LOG = LogFactory.getLog(new Object() { }.getClass().getEnclosingClass()); - - + private IndexingFilters filters; private ScoringFilters scoringFilters; - + public IndexUtil(Configuration conf) { filters = new IndexingFilters(conf); scoringFilters = new ScoringFilters(conf); } - + /** * Index a {@link Webpage}, here we add the following fields: * <ol> * <li><tt>id</tt>: default uniqueKey for the {@link NutchDocument}.</li> - * <li><tt>digest</tt>: Digest is used to identify pages (like unique ID) and is used to remove - * duplicates during the dedup procedure. It is calculated using {@link org.apache.nutch.crawl.MD5Signature} or + * <li><tt>digest</tt>: Digest is used to identify pages (like unique ID) and + * is used to remove duplicates during the dedup procedure. It is calculated + * using {@link org.apache.nutch.crawl.MD5Signature} or * {@link org.apache.nutch.crawl.TextProfileSignature}.</li> - * <li><tt>batchId</tt>: The page belongs to a unique batchId, this is its identifier.</li> - * <li><tt>boost</tt>: Boost is used to calculate document (field) score which can be used within - * queries submitted to the underlying indexing library to find the best results. It's part of the scoring algorithms. - * See scoring.link, scoring.opic, scoring.tld, etc.</li> + * <li><tt>batchId</tt>: The page belongs to a unique batchId, this is its + * identifier.</li> + * <li><tt>boost</tt>: Boost is used to calculate document (field) score which + * can be used within queries submitted to the underlying indexing library to + * find the best results. It's part of the scoring algorithms. See + * scoring.link, scoring.opic, scoring.tld, etc.</li> * </ol> * - * @param key The key of the page (reversed url). - * @param page The {@link Webpage}. + * @param key + * The key of the page (reversed url). + * @param page + * The {@link Webpage}. * @return The indexed document, or null if skipped by index filters. */ public NutchDocument index(String key, WebPage page) { @@ -66,7 +70,7 @@ public class IndexUtil { if (page.getBatchId() != null) { doc.add("batchId", page.getBatchId().toString()); } - + String url = TableUtil.unreverseUrl(key); if (LOG.isDebugEnabled()) { @@ -76,12 +80,13 @@ public class IndexUtil { try { doc = filters.filter(doc, url, page); } catch (IndexingException e) { - LOG.warn("Error indexing "+key+": "+e); + LOG.warn("Error indexing " + key + ": " + e); return null; } // skip documents discarded by indexing filters - if (doc == null) return null; + if (doc == null) + return null; float boost = 1.0f; // run scoring filters @@ -98,5 +103,5 @@ public class IndexUtil { return doc; } - + } Modified: nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexWriter.java URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexWriter.java?rev=1650447&r1=1650446&r2=1650447&view=diff ============================================================================== --- nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexWriter.java (original) +++ nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexWriter.java Fri Jan 9 06:34:33 2015 @@ -26,19 +26,22 @@ import org.apache.nutch.plugin.Pluggable public interface IndexWriter extends Configurable, Pluggable { /** The name of the extension point. */ final static String X_POINT_ID = IndexWriter.class.getName(); - + public void open(Configuration job) throws IOException; public void write(NutchDocument doc) throws IOException; - + public void delete(String key) throws IOException; - + public void update(NutchDocument doc) throws IOException; - + public void commit() throws IOException; public void close() throws IOException; - - /** Returns a String describing the IndexWriter instance and the specific parameters it can take */ + + /** + * Returns a String describing the IndexWriter instance and the specific + * parameters it can take + */ public String describe(); } Modified: nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexWriters.java URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexWriters.java?rev=1650447&r1=1650446&r2=1650447&view=diff ============================================================================== --- nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexWriters.java (original) +++ nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexWriters.java Fri Jan 9 06:34:33 2015 @@ -32,8 +32,7 @@ import org.slf4j.LoggerFactory; /** Creates and caches {@link IndexWriter} implementing plugins. */ public class IndexWriters { - public final static Logger LOG = LoggerFactory - .getLogger(IndexWriters.class); + public final static Logger LOG = LoggerFactory.getLogger(IndexWriters.class); private IndexWriter[] indexWriters; @@ -44,17 +43,15 @@ public class IndexWriters { .getObject(IndexWriter.class.getName()); if (this.indexWriters == null) { try { - ExtensionPoint point = PluginRepository.get(conf) - .getExtensionPoint(IndexWriter.X_POINT_ID); + ExtensionPoint point = PluginRepository.get(conf).getExtensionPoint( + IndexWriter.X_POINT_ID); if (point == null) - throw new RuntimeException(IndexWriter.X_POINT_ID - + " not found."); + throw new RuntimeException(IndexWriter.X_POINT_ID + " not found."); Extension[] extensions = point.getExtensions(); HashMap<String, IndexWriter> indexerMap = new HashMap<String, IndexWriter>(); for (int i = 0; i < extensions.length; i++) { Extension extension = extensions[i]; - IndexWriter writer = (IndexWriter) extension - .getExtensionInstance(); + IndexWriter writer = (IndexWriter) extension.getExtensionInstance(); LOG.info("Adding " + writer.getClass().getName()); if (!indexerMap.containsKey(writer.getClass().getName())) { indexerMap.put(writer.getClass().getName(), writer); Modified: nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexerOutputFormat.java URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexerOutputFormat.java?rev=1650447&r1=1650446&r2=1650447&view=diff ============================================================================== --- nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexerOutputFormat.java (original) +++ nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexerOutputFormat.java Fri Jan 9 06:34:33 2015 @@ -30,29 +30,29 @@ public class IndexerOutputFormat extends public RecordWriter<String, NutchDocument> getRecordWriter( TaskAttemptContext job) throws IOException, InterruptedException { - //final IndexWriter[] writers = - // NutchIndexWriterFactory.getNutchIndexWriters(job.getConfiguration()); + // final IndexWriter[] writers = + // NutchIndexWriterFactory.getNutchIndexWriters(job.getConfiguration()); final IndexWriters writers = new IndexWriters(job.getConfiguration()); - -// for (final IndexWriter writer : writers) { -// writer.open(job); -// } + + // for (final IndexWriter writer : writers) { + // writer.open(job); + // } writers.open(job.getConfiguration()); - + return new RecordWriter<String, NutchDocument>() { @Override public void write(String key, NutchDocument doc) throws IOException { - // TODO: Check Write Status for delete or write. + // TODO: Check Write Status for delete or write. writers.write(doc); } @Override public void close(TaskAttemptContext context) throws IOException, - InterruptedException { - writers.close(); - } + InterruptedException { + writers.close(); + } }; } @@ -64,21 +64,26 @@ public class IndexerOutputFormat extends @Override public OutputCommitter getOutputCommitter(TaskAttemptContext arg0) throws IOException, InterruptedException { - //return an empty outputcommitter + // return an empty outputcommitter return new OutputCommitter() { @Override public void setupTask(TaskAttemptContext arg0) throws IOException { } + @Override public void setupJob(JobContext arg0) throws IOException { } + @Override - public boolean needsTaskCommit(TaskAttemptContext arg0) throws IOException { + public boolean needsTaskCommit(TaskAttemptContext arg0) + throws IOException { return false; } + @Override public void commitTask(TaskAttemptContext arg0) throws IOException { } + @Override public void abortTask(TaskAttemptContext arg0) throws IOException { } Modified: nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexingFilter.java URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexingFilter.java?rev=1650447&r1=1650446&r2=1650447&view=diff ============================================================================== --- nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexingFilter.java (original) +++ nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexingFilter.java Fri Jan 9 06:34:33 2015 @@ -22,9 +22,9 @@ import org.apache.hadoop.conf.Configurab import org.apache.nutch.plugin.FieldPluggable; import org.apache.nutch.storage.WebPage; - -/** Extension point for indexing. Permits one to add metadata to the indexed - * fields. All plugins found which implement this extension point are run +/** + * Extension point for indexing. Permits one to add metadata to the indexed + * fields. All plugins found which implement this extension point are run * sequentially on the parse. */ public interface IndexingFilter extends FieldPluggable, Configurable { @@ -33,15 +33,18 @@ public interface IndexingFilter extends /** * Adds fields or otherwise modifies the document that will be indexed for a - * parse. Unwanted documents can be removed from indexing by returning a null value. - * - * @param doc document instance for collecting fields - * @param url page url + * parse. Unwanted documents can be removed from indexing by returning a null + * value. + * + * @param doc + * document instance for collecting fields + * @param url + * page url * @param page - * @return modified (or a new) document instance, or null (meaning the document - * should be discarded) + * @return modified (or a new) document instance, or null (meaning the + * document should be discarded) * @throws IndexingException */ NutchDocument filter(NutchDocument doc, String url, WebPage page) - throws IndexingException; + throws IndexingException; } Modified: nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexingFilters.java URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexingFilters.java?rev=1650447&r1=1650446&r2=1650447&view=diff ============================================================================== --- nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexingFilters.java (original) +++ nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexingFilters.java Fri Jan 9 06:34:33 2015 @@ -32,12 +32,13 @@ import org.apache.nutch.plugin.PluginRun import org.apache.nutch.storage.WebPage; import org.apache.nutch.util.ObjectCache; -/** Creates and caches {@link IndexingFilter} implementing plugins.*/ +/** Creates and caches {@link IndexingFilter} implementing plugins. */ public class IndexingFilters { public static final String INDEXINGFILTER_ORDER = "indexingfilter.order"; - public final static Logger LOG = LoggerFactory.getLogger(IndexingFilters.class); + public final static Logger LOG = LoggerFactory + .getLogger(IndexingFilters.class); private IndexingFilter[] indexingFilters; @@ -62,8 +63,7 @@ public class IndexingFilters { if (point == null) throw new RuntimeException(IndexingFilter.X_POINT_ID + " not found."); Extension[] extensions = point.getExtensions(); - HashMap<String, IndexingFilter> filterMap = - new HashMap<String, IndexingFilter>(); + HashMap<String, IndexingFilter> filterMap = new HashMap<String, IndexingFilter>(); for (int i = 0; i < extensions.length; i++) { Extension extension = extensions[i]; IndexingFilter filter = (IndexingFilter) extension @@ -78,9 +78,8 @@ public class IndexingFilters { * indeterminate order */ if (orderedFilters == null) { - objectCache.setObject(IndexingFilter.class.getName(), - filterMap.values().toArray( - new IndexingFilter[0])); + objectCache.setObject(IndexingFilter.class.getName(), filterMap + .values().toArray(new IndexingFilter[0])); /* Otherwise run the filters in the required order */ } else { ArrayList<IndexingFilter> filters = new ArrayList<IndexingFilter>(); @@ -90,8 +89,8 @@ public class IndexingFilters { filters.add(filter); } } - objectCache.setObject(IndexingFilter.class.getName(), filters - .toArray(new IndexingFilter[filters.size()])); + objectCache.setObject(IndexingFilter.class.getName(), + filters.toArray(new IndexingFilter[filters.size()])); } } catch (PluginRuntimeException e) { throw new RuntimeException(e); @@ -100,23 +99,24 @@ public class IndexingFilters { .getObject(IndexingFilter.class.getName()); } } + /** Run all defined filters. */ public NutchDocument filter(NutchDocument doc, String url, WebPage page) - throws IndexingException { + throws IndexingException { for (IndexingFilter indexingFilter : indexingFilters) { doc = indexingFilter.filter(doc, url, page); // break the loop if an indexing filter discards the doc - if (doc == null) return null; + if (doc == null) + return null; } return doc; } /** - * Gets all the fields for a given {@link WebPage} - * Many datastores need to setup the mapreduce job by specifying the fields - * needed. All extensions that work on WebPage are able to specify what fields - * they need. + * Gets all the fields for a given {@link WebPage} Many datastores need to + * setup the mapreduce job by specifying the fields needed. All extensions + * that work on WebPage are able to specify what fields they need. */ public Collection<WebPage.Field> getFields() { Collection<WebPage.Field> columns = new HashSet<WebPage.Field>(); Modified: nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexingFiltersChecker.java URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexingFiltersChecker.java?rev=1650447&r1=1650446&r2=1650447&view=diff ============================================================================== --- nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexingFiltersChecker.java (original) +++ nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexingFiltersChecker.java Fri Jan 9 06:34:33 2015 @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - + package org.apache.nutch.indexer; import java.nio.ByteBuffer; @@ -43,16 +43,19 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Reads and parses a URL and run the indexers on it. Displays the fields obtained and the first - * 100 characters of their value - * - * Tested with e.g. ./nutch org.apache.nutch.indexer.IndexingFiltersChecker http://www.lemonde.fr + * Reads and parses a URL and run the indexers on it. Displays the fields + * obtained and the first 100 characters of their value + * + * Tested with e.g. ./nutch org.apache.nutch.indexer.IndexingFiltersChecker + * http://www.lemonde.fr + * * @author Julien Nioche **/ public class IndexingFiltersChecker extends Configured implements Tool { - public static final Logger LOG = LoggerFactory.getLogger(IndexingFiltersChecker.class); + public static final Logger LOG = LoggerFactory + .getLogger(IndexingFiltersChecker.class); public IndexingFiltersChecker() { @@ -85,7 +88,7 @@ public class IndexingFiltersChecker exte ProtocolOutput protocolOutput = protocol.getProtocolOutput(url, page); page.setProtocolStatus(protocolOutput.getStatus()); if (protocolOutput.getStatus().getCode() == ProtocolStatusCodes.SUCCESS) { - page.setStatus((int)CrawlStatus.STATUS_FETCHED); + page.setStatus((int) CrawlStatus.STATUS_FETCHED); page.setFetchTime(System.currentTimeMillis()); } else { LOG.error("Fetch failed with protocol status: " @@ -93,7 +96,7 @@ public class IndexingFiltersChecker exte + ": " + ProtocolStatusUtils.getMessage(protocolOutput.getStatus())); return -1; } - + Content content = protocolOutput.getContent(); if (content == null) { LOG.warn("No content for " + url); @@ -106,7 +109,7 @@ public class IndexingFiltersChecker exte return -1; } page.setContentType(new Utf8(contentType)); - + if (LOG.isInfoEnabled()) { LOG.info("parsing: " + url); LOG.info("contentType: " + contentType); @@ -136,7 +139,7 @@ public class IndexingFiltersChecker exte LOG.info("Document discarded by indexing filter"); return 0; } - + for (String fname : doc.getFieldNames()) { List<String> values = doc.getFieldValues(fname); if (values != null) { Modified: nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexingJob.java URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexingJob.java?rev=1650447&r1=1650446&r2=1650447&view=diff ============================================================================== --- nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexingJob.java (original) +++ nutch/branches/2.x/src/java/org/apache/nutch/indexer/IndexingJob.java Fri Jan 9 06:34:33 2015 @@ -180,7 +180,7 @@ public class IndexingJob extends NutchTo IndexWriters writers = new IndexWriters(getConf()); LOG.info(writers.describe()); - + writers.open(getConf()); if (getConf().getBoolean(SolrConstants.COMMIT_INDEX, true)) { writers.commit(); Modified: nutch/branches/2.x/src/java/org/apache/nutch/indexer/NutchDocument.java URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/indexer/NutchDocument.java?rev=1650447&r1=1650446&r2=1650447&view=diff ============================================================================== --- nutch/branches/2.x/src/java/org/apache/nutch/indexer/NutchDocument.java (original) +++ nutch/branches/2.x/src/java/org/apache/nutch/indexer/NutchDocument.java Fri Jan 9 06:34:33 2015 @@ -33,9 +33,9 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.apache.nutch.metadata.Metadata; -/** A {@link NutchDocument} is the unit of indexing.*/ -public class NutchDocument -implements Writable, Iterable<Entry<String, List<String>>> { +/** A {@link NutchDocument} is the unit of indexing. */ +public class NutchDocument implements Writable, + Iterable<Entry<String, List<String>>> { public static final byte VERSION = 1; @@ -139,11 +139,11 @@ implements Writable, Iterable<Entry<Stri } /** - * A utility-like method which can easily be used to write - * any {@link org.apache.nutch.indexer.NutchDocument} object - * to string for simple debugging. + * A utility-like method which can easily be used to write any + * {@link org.apache.nutch.indexer.NutchDocument} object to string for simple + * debugging. */ - public String toString() { + public String toString() { StringBuilder sb = new StringBuilder(); sb.append("doc {\n"); for (Entry<String, List<String>> entry : fields.entrySet()) { Modified: nutch/branches/2.x/src/java/org/apache/nutch/indexer/solr/SolrConstants.java URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/indexer/solr/SolrConstants.java?rev=1650447&r1=1650446&r2=1650447&view=diff ============================================================================== --- nutch/branches/2.x/src/java/org/apache/nutch/indexer/solr/SolrConstants.java (original) +++ nutch/branches/2.x/src/java/org/apache/nutch/indexer/solr/SolrConstants.java Fri Jan 9 06:34:33 2015 @@ -22,7 +22,7 @@ public interface SolrConstants { public static final String SERVER_URL = SOLR_PREFIX + "server.url"; public static final String COMMIT_SIZE = SOLR_PREFIX + "commit.size"; - + public static final String COMMIT_INDEX = SOLR_PREFIX + "commit.index"; public static final String MAPPING_FILE = SOLR_PREFIX + "mapping.file"; @@ -32,15 +32,15 @@ public interface SolrConstants { public static final String USERNAME = SOLR_PREFIX + "auth.username"; public static final String PASSWORD = SOLR_PREFIX + "auth.password"; - + public static final String ID_FIELD = "id"; - + public static final String URL_FIELD = "url"; - + public static final String BOOST_FIELD = "boost"; - + public static final String TIMESTAMP_FIELD = "tstamp"; - + public static final String DIGEST_FIELD = "digest"; } Modified: nutch/branches/2.x/src/java/org/apache/nutch/indexer/solr/SolrDeleteDuplicates.java URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/indexer/solr/SolrDeleteDuplicates.java?rev=1650447&r1=1650446&r2=1650447&view=diff ============================================================================== --- nutch/branches/2.x/src/java/org/apache/nutch/indexer/solr/SolrDeleteDuplicates.java (original) +++ nutch/branches/2.x/src/java/org/apache/nutch/indexer/solr/SolrDeleteDuplicates.java Fri Jan 9 06:34:33 2015 @@ -51,42 +51,44 @@ import org.apache.solr.client.solrj.resp import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrDocumentList; -/** +/** * Utility class for deleting duplicate documents from a solr index. - * + * * The algorithm goes like follows: * * Preparation: * <ol> * <li>Query the solr server for the number of documents (say, N)</li> - * <li>Partition N among M map tasks. For example, if we have two map tasks - * the first map task will deal with solr documents from 0 - (N / 2 - 1) and - * the second will deal with documents from (N / 2) to (N - 1).</li> + * <li>Partition N among M map tasks. For example, if we have two map tasks the + * first map task will deal with solr documents from 0 - (N / 2 - 1) and the + * second will deal with documents from (N / 2) to (N - 1).</li> * </ol> * * MapReduce: * <ul> - * <li>Map: Identity map where keys are digests and values are {@link SolrRecord} - * instances(which contain id, boost and timestamp)</li> + * <li>Map: Identity map where keys are digests and values are + * {@link SolrRecord} instances(which contain id, boost and timestamp)</li> * <li>Reduce: After map, {@link SolrRecord}s with the same digest will be - * grouped together. Now, of these documents with the same digests, delete - * all of them except the one with the highest score (boost field). If two - * (or more) documents have the same score, then the document with the latest - * timestamp is kept. Again, every other is deleted from solr index. - * </li> + * grouped together. Now, of these documents with the same digests, delete all + * of them except the one with the highest score (boost field). If two (or more) + * documents have the same score, then the document with the latest timestamp is + * kept. Again, every other is deleted from solr index.</li> * </ul> * - * Note that we assume that two documents in - * a solr index will never have the same URL. So this class only deals with - * documents with <b>different</b> URLs but the same digest. + * Note that we assume that two documents in a solr index will never have the + * same URL. So this class only deals with documents with <b>different</b> URLs + * but the same digest. */ public class SolrDeleteDuplicates -extends Reducer<Text, SolrDeleteDuplicates.SolrRecord, Text, SolrDeleteDuplicates.SolrRecord> -implements Tool { + extends + Reducer<Text, SolrDeleteDuplicates.SolrRecord, Text, SolrDeleteDuplicates.SolrRecord> + implements Tool { - public static final Logger LOG = LoggerFactory.getLogger(SolrDeleteDuplicates.class); + public static final Logger LOG = LoggerFactory + .getLogger(SolrDeleteDuplicates.class); - private static final String SOLR_GET_ALL_QUERY = SolrConstants.ID_FIELD + ":[* TO *]"; + private static final String SOLR_GET_ALL_QUERY = SolrConstants.ID_FIELD + + ":[* TO *]"; private static final int NUM_MAX_DELETE_REQUEST = 1000; @@ -96,7 +98,8 @@ implements Tool { private long tstamp; private String id; - public SolrRecord() { } + public SolrRecord() { + } public SolrRecord(String id, float boost, long tstamp) { this.id = id; @@ -117,10 +120,10 @@ implements Tool { } public void readSolrDocument(SolrDocument doc) { - id = (String)doc.getFieldValue(SolrConstants.ID_FIELD); - boost = (Float)doc.getFieldValue(SolrConstants.BOOST_FIELD); + id = (String) doc.getFieldValue(SolrConstants.ID_FIELD); + boost = (Float) doc.getFieldValue(SolrConstants.BOOST_FIELD); - Date buffer = (Date)doc.getFieldValue(SolrConstants.TIMESTAMP_FIELD); + Date buffer = (Date) doc.getFieldValue(SolrConstants.TIMESTAMP_FIELD); tstamp = buffer.getTime(); } @@ -136,7 +139,7 @@ implements Tool { Text.writeString(out, id); out.writeFloat(boost); out.writeLong(tstamp); - } + } } public static class SolrInputSplit extends InputSplit implements Writable { @@ -144,7 +147,8 @@ implements Tool { private int docBegin; private int numDocs; - public SolrInputSplit() { } + public SolrInputSplit() { + } public SolrInputSplit(int docBegin, int numDocs) { this.docBegin = docBegin; @@ -162,7 +166,7 @@ implements Tool { @Override public String[] getLocations() throws IOException { - return new String[] {} ; + return new String[] {}; } @Override @@ -175,9 +179,9 @@ implements Tool { public void write(DataOutput out) throws IOException { out.writeInt(docBegin); out.writeInt(numDocs); - } + } } - + public static class SolrRecordReader extends RecordReader<Text, SolrRecord> { private int currentDoc = 0; @@ -185,21 +189,22 @@ implements Tool { private Text text; private SolrRecord record; private SolrDocumentList solrDocs; - + public SolrRecordReader(SolrDocumentList solrDocs, int numDocs) { this.solrDocs = solrDocs; this.numDocs = numDocs; } - + @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { text = new Text(); - record = new SolrRecord(); + record = new SolrRecord(); } @Override - public void close() throws IOException { } + public void close() throws IOException { + } @Override public float getProgress() throws IOException { @@ -231,14 +236,14 @@ implements Tool { currentDoc++; return true; } - + }; public static class SolrInputFormat extends InputFormat<Text, SolrRecord> { - + @Override - public List<InputSplit> getSplits(JobContext context) - throws IOException, InterruptedException { + public List<InputSplit> getSplits(JobContext context) throws IOException, + InterruptedException { Configuration conf = context.getConfiguration(); int numSplits = context.getNumReduceTasks(); SolrServer solr = SolrUtils.getHttpSolrServer(conf); @@ -254,8 +259,8 @@ implements Tool { throw new IOException(e); } - int numResults = (int)response.getResults().getNumFound(); - int numDocsPerSplit = (numResults / numSplits); + int numResults = (int) response.getResults().getNumFound(); + int numDocsPerSplit = (numResults / numSplits); int currentDoc = 0; List<InputSplit> splits = new ArrayList<InputSplit>(); for (int i = 0; i < numSplits - 1; i++) { @@ -274,11 +279,10 @@ implements Tool { SolrServer solr = SolrUtils.getHttpSolrServer(conf); SolrInputSplit solrSplit = (SolrInputSplit) split; final int numDocs = (int) solrSplit.getLength(); - + SolrQuery solrQuery = new SolrQuery(SOLR_GET_ALL_QUERY); solrQuery.setFields(SolrConstants.ID_FIELD, SolrConstants.BOOST_FIELD, - SolrConstants.TIMESTAMP_FIELD, - SolrConstants.DIGEST_FIELD); + SolrConstants.TIMESTAMP_FIELD, SolrConstants.DIGEST_FIELD); solrQuery.setStart(solrSplit.getDocBegin()); solrQuery.setRows(numDocs); @@ -318,7 +322,6 @@ implements Tool { solr = SolrUtils.getHttpSolrServer(conf); } - @Override public void cleanup(Context context) throws IOException { try { @@ -334,14 +337,14 @@ implements Tool { @Override public void reduce(Text key, Iterable<SolrRecord> values, Context context) - throws IOException { + throws IOException { Iterator<SolrRecord> iterator = values.iterator(); SolrRecord recordToKeep = iterator.next(); while (iterator.hasNext()) { SolrRecord solrRecord = iterator.next(); - if (solrRecord.getBoost() > recordToKeep.getBoost() || - (solrRecord.getBoost() == recordToKeep.getBoost() && - solrRecord.getTstamp() > recordToKeep.getTstamp())) { + if (solrRecord.getBoost() > recordToKeep.getBoost() + || (solrRecord.getBoost() == recordToKeep.getBoost() && solrRecord + .getTstamp() > recordToKeep.getTstamp())) { updateRequest.deleteById(recordToKeep.id); recordToKeep = solrRecord; } else { @@ -360,13 +363,13 @@ implements Tool { } } - public boolean dedup(String solrUrl) - throws IOException, InterruptedException, ClassNotFoundException { + public boolean dedup(String solrUrl) throws IOException, + InterruptedException, ClassNotFoundException { LOG.info("SolrDeleteDuplicates: starting..."); LOG.info("SolrDeleteDuplicates: Solr url: " + solrUrl); - + getConf().set(SolrConstants.SERVER_URL, solrUrl); - + Job job = new Job(getConf(), "solrdedup"); job.setInputFormatClass(SolrInputFormat.class); @@ -376,11 +379,11 @@ implements Tool { job.setMapperClass(Mapper.class); job.setReducerClass(SolrDeleteDuplicates.class); - return job.waitForCompletion(true); + return job.waitForCompletion(true); } - public int run(String[] args) - throws IOException, InterruptedException, ClassNotFoundException { + public int run(String[] args) throws IOException, InterruptedException, + ClassNotFoundException { if (args.length != 1) { System.err.println("Usage: SolrDeleteDuplicates <solr url>"); return 1;
