This is an automated email from the ASF dual-hosted git repository.
lewismc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nutch.git
The following commit(s) were added to refs/heads/master by this push:
new 66f678e62 NUTCH-3141 Cache Hadoop Counter References in Hot Paths
(#878)
66f678e62 is described below
commit 66f678e62f57de30e605a1e0d23d7923bf21c780
Author: Lewis John McGibbney <[email protected]>
AuthorDate: Thu Jan 8 09:33:06 2026 -0800
NUTCH-3141 Cache Hadoop Counter References in Hot Paths (#878)
---
src/java/org/apache/nutch/crawl/CrawlDb.java | 3 +-
.../org/apache/nutch/crawl/DeduplicationJob.java | 10 ++-
src/java/org/apache/nutch/fetcher/QueueFeeder.java | 34 +++++++---
.../apache/nutch/hostdb/UpdateHostDbMapper.java | 23 ++++---
.../apache/nutch/hostdb/UpdateHostDbReducer.java | 23 +++++--
.../org/apache/nutch/indexer/IndexerMapReduce.java | 72 ++++++++++++++++------
.../org/apache/nutch/tools/warc/WARCExporter.java | 62 +++++++++++--------
.../org/apache/nutch/util/SitemapProcessor.java | 64 ++++++++++++-------
8 files changed, 197 insertions(+), 94 deletions(-)
diff --git a/src/java/org/apache/nutch/crawl/CrawlDb.java
b/src/java/org/apache/nutch/crawl/CrawlDb.java
index 01598a5f1..32081e1d6 100644
--- a/src/java/org/apache/nutch/crawl/CrawlDb.java
+++ b/src/java/org/apache/nutch/crawl/CrawlDb.java
@@ -43,6 +43,7 @@ import
org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.metrics.NutchMetrics;
import org.apache.nutch.util.FSUtils;
import org.apache.nutch.util.HadoopFSUtil;
import org.apache.nutch.util.LockUtil;
@@ -145,7 +146,7 @@ public class CrawlDb extends NutchTool implements Tool {
if (filter) {
long urlsFiltered = job.getCounters()
- .findCounter("CrawlDB filter", "URLs filtered").getValue();
+ .findCounter(NutchMetrics.GROUP_CRAWLDB_FILTER,
NutchMetrics.CRAWLDB_URLS_FILTERED_TOTAL).getValue();
LOG.info(
"CrawlDb update: Total number of existing URLs in CrawlDb rejected
by URL filters: {}",
urlsFiltered);
diff --git a/src/java/org/apache/nutch/crawl/DeduplicationJob.java
b/src/java/org/apache/nutch/crawl/DeduplicationJob.java
index cdb291fe8..d5f983a27 100644
--- a/src/java/org/apache/nutch/crawl/DeduplicationJob.java
+++ b/src/java/org/apache/nutch/crawl/DeduplicationJob.java
@@ -335,12 +335,10 @@ public class DeduplicationJob extends NutchTool
implements Tool {
fs.delete(tempDir, true);
throw new RuntimeException(message);
}
- CounterGroup g = job.getCounters().getGroup("DeduplicationJobStatus");
- if (g != null) {
- Counter counter = g.findCounter("Documents marked as duplicate");
- long dups = counter.getValue();
- LOG.info("Deduplication: {} documents marked as duplicates", dups);
- }
+ long dups = job.getCounters()
+ .findCounter(NutchMetrics.GROUP_DEDUP,
NutchMetrics.DEDUP_DOCUMENTS_MARKED_DUPLICATE_TOTAL)
+ .getValue();
+ LOG.info("Deduplication: {} documents marked as duplicates", dups);
} catch (IOException | InterruptedException | ClassNotFoundException e) {
LOG.error("DeduplicationJob:", e);
fs.delete(tempDir, true);
diff --git a/src/java/org/apache/nutch/fetcher/QueueFeeder.java
b/src/java/org/apache/nutch/fetcher/QueueFeeder.java
index 6ee973dd3..5dfa24fd0 100644
--- a/src/java/org/apache/nutch/fetcher/QueueFeeder.java
+++ b/src/java/org/apache/nutch/fetcher/QueueFeeder.java
@@ -22,6 +22,7 @@ import java.net.MalformedURLException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counter;
import org.apache.nutch.crawl.CrawlDatum;
import org.apache.nutch.fetcher.FetchItemQueues.QueuingStatus;
import org.apache.nutch.fetcher.Fetcher.FetcherRun;
@@ -48,6 +49,12 @@ public class QueueFeeder extends Thread {
private URLNormalizers urlNormalizers = null;
private String urlNormalizerScope = URLNormalizers.SCOPE_DEFAULT;
+ // Cached counter references to avoid repeated lookups in hot paths
+ private Counter hitByTimeoutCounter;
+ private Counter hitByTimelimitCounter;
+ private Counter filteredCounter;
+ private Counter aboveExceptionThresholdCounter;
+
public QueueFeeder(FetcherRun.Context context,
FetchItemQueues queues, int size) {
this.context = context;
@@ -62,6 +69,21 @@ public class QueueFeeder extends Thread {
if (conf.getBoolean("fetcher.normalize.urls", false)) {
urlNormalizers = new URLNormalizers(conf, urlNormalizerScope);
}
+ initCounters();
+ }
+
+ /**
+ * Initialize cached counter references to avoid repeated lookups in hot
paths.
+ */
+ private void initCounters() {
+ hitByTimeoutCounter = context.getCounter(
+ NutchMetrics.GROUP_FETCHER, NutchMetrics.FETCHER_HIT_BY_TIMEOUT_TOTAL);
+ hitByTimelimitCounter = context.getCounter(
+ NutchMetrics.GROUP_FETCHER,
NutchMetrics.FETCHER_HIT_BY_TIMELIMIT_TOTAL);
+ filteredCounter = context.getCounter(
+ NutchMetrics.GROUP_FETCHER, NutchMetrics.FETCHER_FILTERED_TOTAL);
+ aboveExceptionThresholdCounter = context.getCounter(
+ NutchMetrics.GROUP_FETCHER,
NutchMetrics.FETCHER_ABOVE_EXCEPTION_THRESHOLD_TOTAL);
}
/** Filter and normalize the url */
@@ -95,16 +117,14 @@ public class QueueFeeder extends Thread {
LOG.info("QueueFeeder stopping, timeout reached.");
}
queuingStatus[qstatus]++;
- context.getCounter(NutchMetrics.GROUP_FETCHER,
- NutchMetrics.FETCHER_HIT_BY_TIMEOUT_TOTAL).increment(1);
+ hitByTimeoutCounter.increment(1);
} else {
int qstatus = QueuingStatus.HIT_BY_TIMELIMIT.ordinal();
if (queuingStatus[qstatus] == 0) {
LOG.info("QueueFeeder stopping, timelimit exceeded.");
}
queuingStatus[qstatus]++;
- context.getCounter(NutchMetrics.GROUP_FETCHER,
- NutchMetrics.FETCHER_HIT_BY_TIMELIMIT_TOTAL).increment(1);
+ hitByTimelimitCounter.increment(1);
}
try {
hasMore = context.nextKeyValue();
@@ -136,8 +156,7 @@ public class QueueFeeder extends Thread {
String u = filterNormalize(url.toString());
if (u == null) {
// filtered or failed to normalize
- context.getCounter(NutchMetrics.GROUP_FETCHER,
- NutchMetrics.FETCHER_FILTERED_TOTAL).increment(1);
+ filteredCounter.increment(1);
continue;
}
url = new Text(u);
@@ -154,8 +173,7 @@ public class QueueFeeder extends Thread {
QueuingStatus status = queues.addFetchItem(url, datum);
queuingStatus[status.ordinal()]++;
if (status == QueuingStatus.ABOVE_EXCEPTION_THRESHOLD) {
- context.getCounter(NutchMetrics.GROUP_FETCHER,
-
NutchMetrics.FETCHER_ABOVE_EXCEPTION_THRESHOLD_TOTAL).increment(1);
+ aboveExceptionThresholdCounter.increment(1);
}
cnt++;
feed--;
diff --git a/src/java/org/apache/nutch/hostdb/UpdateHostDbMapper.java
b/src/java/org/apache/nutch/hostdb/UpdateHostDbMapper.java
index 1495f7491..8de2dcdf2 100644
--- a/src/java/org/apache/nutch/hostdb/UpdateHostDbMapper.java
+++ b/src/java/org/apache/nutch/hostdb/UpdateHostDbMapper.java
@@ -24,6 +24,7 @@ import java.net.URL;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.conf.Configuration;
@@ -61,6 +62,10 @@ public class UpdateHostDbMapper
protected URLFilters filters = null;
protected URLNormalizers normalizers = null;
+ // Cached counter references to avoid repeated lookups in hot paths
+ protected Counter malformedUrlCounter;
+ protected Counter filteredRecordsCounter;
+
@Override
public void setup(Mapper<Text, Writable, Text, NutchWritable>.Context
context) {
Configuration conf = context.getConfiguration();
@@ -72,6 +77,12 @@ public class UpdateHostDbMapper
filters = new URLFilters(conf);
if (normalize)
normalizers = new URLNormalizers(conf, URLNormalizers.SCOPE_DEFAULT);
+
+ // Initialize cached counter references
+ malformedUrlCounter = context.getCounter(
+ NutchMetrics.GROUP_HOSTDB, NutchMetrics.HOSTDB_MALFORMED_URL_TOTAL);
+ filteredRecordsCounter = context.getCounter(
+ NutchMetrics.GROUP_HOSTDB, NutchMetrics.HOSTDB_FILTERED_RECORDS_TOTAL);
}
/**
@@ -137,8 +148,7 @@ public class UpdateHostDbMapper
try {
url = new URL(keyStr);
} catch (MalformedURLException e) {
- context.getCounter(NutchMetrics.GROUP_HOSTDB,
- NutchMetrics.HOSTDB_MALFORMED_URL_TOTAL).increment(1);
+ malformedUrlCounter.increment(1);
return;
}
String hostName = URLUtil.getHost(url);
@@ -148,8 +158,7 @@ public class UpdateHostDbMapper
// Filtered out?
if (buffer == null) {
- context.getCounter(NutchMetrics.GROUP_HOSTDB,
- NutchMetrics.HOSTDB_FILTERED_RECORDS_TOTAL).increment(1);
+ filteredRecordsCounter.increment(1);
LOG.debug("UpdateHostDb: {} crawldatum has been filtered", hostName);
return;
}
@@ -222,8 +231,7 @@ public class UpdateHostDbMapper
// Filtered out?
if (buffer == null) {
- context.getCounter(NutchMetrics.GROUP_HOSTDB,
- NutchMetrics.HOSTDB_FILTERED_RECORDS_TOTAL).increment(1);
+ filteredRecordsCounter.increment(1);
LOG.debug("UpdateHostDb: {} hostdatum has been filtered", keyStr);
return;
}
@@ -247,8 +255,7 @@ public class UpdateHostDbMapper
// Filtered out?
if (buffer == null) {
- context.getCounter(NutchMetrics.GROUP_HOSTDB,
- NutchMetrics.HOSTDB_FILTERED_RECORDS_TOTAL).increment(1);
+ filteredRecordsCounter.increment(1);
LOG.debug("UpdateHostDb: {} score has been filtered", keyStr);
return;
}
diff --git a/src/java/org/apache/nutch/hostdb/UpdateHostDbReducer.java
b/src/java/org/apache/nutch/hostdb/UpdateHostDbReducer.java
index 039fa5ba1..6c979f222 100644
--- a/src/java/org/apache/nutch/hostdb/UpdateHostDbReducer.java
+++ b/src/java/org/apache/nutch/hostdb/UpdateHostDbReducer.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.StringUtils;
@@ -73,6 +74,11 @@ public class UpdateHostDbReducer
protected BlockingQueue<Runnable> queue = new SynchronousQueue<>();
protected ThreadPoolExecutor executor = null;
+ // Cached counter references to avoid repeated lookups in hot paths
+ protected Counter urlLimitNotReachedCounter;
+ protected Counter totalHostsCounter;
+ protected Counter skippedNotEligibleCounter;
+
/**
* Configures the thread pool and prestarts all resolver threads.
*/
@@ -146,6 +152,14 @@ public class UpdateHostDbReducer
// Run all threads in the pool
executor.prestartAllCoreThreads();
}
+
+ // Initialize cached counter references
+ urlLimitNotReachedCounter = context.getCounter(
+ NutchMetrics.GROUP_HOSTDB,
NutchMetrics.HOSTDB_URL_LIMIT_NOT_REACHED_TOTAL);
+ totalHostsCounter = context.getCounter(
+ NutchMetrics.GROUP_HOSTDB, NutchMetrics.HOSTDB_TOTAL_HOSTS_TOTAL);
+ skippedNotEligibleCounter = context.getCounter(
+ NutchMetrics.GROUP_HOSTDB,
NutchMetrics.HOSTDB_SKIPPED_NOT_ELIGIBLE_TOTAL);
}
/**
@@ -380,14 +394,12 @@ public class UpdateHostDbReducer
// Impose limits on minimum number of URLs?
if (urlLimit > -1l) {
if (hostDatum.numRecords() < urlLimit) {
- context.getCounter(NutchMetrics.GROUP_HOSTDB,
- NutchMetrics.HOSTDB_URL_LIMIT_NOT_REACHED_TOTAL).increment(1);
+ urlLimitNotReachedCounter.increment(1);
return;
}
}
- context.getCounter(NutchMetrics.GROUP_HOSTDB,
- NutchMetrics.HOSTDB_TOTAL_HOSTS_TOTAL).increment(1);
+ totalHostsCounter.increment(1);
// See if this record is to be checked
if (shouldCheck(hostDatum)) {
@@ -404,8 +416,7 @@ public class UpdateHostDbReducer
// Do not progress, the datum will be written in the resolver thread
return;
} else if (checkAny) {
- context.getCounter(NutchMetrics.GROUP_HOSTDB,
- NutchMetrics.HOSTDB_SKIPPED_NOT_ELIGIBLE_TOTAL).increment(1);
+ skippedNotEligibleCounter.increment(1);
LOG.debug("UpdateHostDb: {}: skipped_not_eligible", key);
}
diff --git a/src/java/org/apache/nutch/indexer/IndexerMapReduce.java
b/src/java/org/apache/nutch/indexer/IndexerMapReduce.java
index 9086a1983..b61a7f99c 100644
--- a/src/java/org/apache/nutch/indexer/IndexerMapReduce.java
+++ b/src/java/org/apache/nutch/indexer/IndexerMapReduce.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
@@ -219,6 +220,18 @@ public class IndexerMapReduce extends Configured {
// Latency tracker for indexing timing metrics
private LatencyTracker indexLatencyTracker;
+ // Cached counter references to avoid repeated lookups in hot paths
+ private Counter deletedRobotsNoIndexCounter;
+ private Counter deletedGoneCounter;
+ private Counter deletedRedirectsCounter;
+ private Counter deletedDuplicatesCounter;
+ private Counter skippedNotModifiedCounter;
+ private Counter errorsScoringFilterCounter;
+ private Counter errorsIndexingFilterCounter;
+ private Counter deletedByIndexingFilterCounter;
+ private Counter skippedByIndexingFilterCounter;
+ private Counter indexedCounter;
+
@Override
public void setup(Reducer<Text, NutchWritable, Text,
NutchIndexAction>.Context context) {
Configuration conf = context.getConfiguration();
@@ -247,6 +260,35 @@ public class IndexerMapReduce extends Configured {
// Initialize latency tracker for indexing timing
indexLatencyTracker = new LatencyTracker(
NutchMetrics.GROUP_INDEXER, NutchMetrics.INDEXER_LATENCY);
+
+ // Initialize cached counter references
+ initCounters(context);
+ }
+
+ /**
+ * Initialize cached counter references to avoid repeated lookups in hot
paths.
+ */
+ private void initCounters(Reducer<Text, NutchWritable, Text,
NutchIndexAction>.Context context) {
+ deletedRobotsNoIndexCounter = context.getCounter(
+ NutchMetrics.GROUP_INDEXER,
NutchMetrics.INDEXER_DELETED_ROBOTS_NOINDEX_TOTAL);
+ deletedGoneCounter = context.getCounter(
+ NutchMetrics.GROUP_INDEXER, NutchMetrics.INDEXER_DELETED_GONE_TOTAL);
+ deletedRedirectsCounter = context.getCounter(
+ NutchMetrics.GROUP_INDEXER,
NutchMetrics.INDEXER_DELETED_REDIRECTS_TOTAL);
+ deletedDuplicatesCounter = context.getCounter(
+ NutchMetrics.GROUP_INDEXER,
NutchMetrics.INDEXER_DELETED_DUPLICATES_TOTAL);
+ skippedNotModifiedCounter = context.getCounter(
+ NutchMetrics.GROUP_INDEXER,
NutchMetrics.INDEXER_SKIPPED_NOT_MODIFIED_TOTAL);
+ errorsScoringFilterCounter = context.getCounter(
+ NutchMetrics.GROUP_INDEXER,
NutchMetrics.INDEXER_ERRORS_SCORING_FILTER_TOTAL);
+ errorsIndexingFilterCounter = context.getCounter(
+ NutchMetrics.GROUP_INDEXER,
NutchMetrics.INDEXER_ERRORS_INDEXING_FILTER_TOTAL);
+ deletedByIndexingFilterCounter = context.getCounter(
+ NutchMetrics.GROUP_INDEXER,
NutchMetrics.INDEXER_DELETED_BY_INDEXING_FILTER_TOTAL);
+ skippedByIndexingFilterCounter = context.getCounter(
+ NutchMetrics.GROUP_INDEXER,
NutchMetrics.INDEXER_SKIPPED_BY_INDEXING_FILTER_TOTAL);
+ indexedCounter = context.getCounter(
+ NutchMetrics.GROUP_INDEXER, NutchMetrics.INDEXER_INDEXED_TOTAL);
}
@Override
@@ -299,8 +341,7 @@ public class IndexerMapReduce extends Configured {
.indexOf("noindex") != -1) {
// Delete it!
context.write(key, DELETE_ACTION);
- context.getCounter(NutchMetrics.GROUP_INDEXER,
-
NutchMetrics.INDEXER_DELETED_ROBOTS_NOINDEX_TOTAL).increment(1);
+ deletedRobotsNoIndexCounter.increment(1);
return;
}
}
@@ -317,8 +358,7 @@ public class IndexerMapReduce extends Configured {
if (delete && fetchDatum != null) {
if (fetchDatum.getStatus() == CrawlDatum.STATUS_FETCH_GONE
|| dbDatum != null && dbDatum.getStatus() ==
CrawlDatum.STATUS_DB_GONE) {
- context.getCounter(NutchMetrics.GROUP_INDEXER,
- NutchMetrics.INDEXER_DELETED_GONE_TOTAL).increment(1);
+ deletedGoneCounter.increment(1);
context.write(key, DELETE_ACTION);
return;
}
@@ -327,8 +367,7 @@ public class IndexerMapReduce extends Configured {
|| fetchDatum.getStatus() == CrawlDatum.STATUS_FETCH_REDIR_TEMP
|| dbDatum != null && dbDatum.getStatus() ==
CrawlDatum.STATUS_DB_REDIR_PERM
|| dbDatum != null && dbDatum.getStatus() ==
CrawlDatum.STATUS_DB_REDIR_TEMP) {
- context.getCounter(NutchMetrics.GROUP_INDEXER,
- NutchMetrics.INDEXER_DELETED_REDIRECTS_TOTAL).increment(1);
+ deletedRedirectsCounter.increment(1);
context.write(key, DELETE_ACTION);
return;
}
@@ -340,16 +379,14 @@ public class IndexerMapReduce extends Configured {
// Whether to delete pages marked as duplicates
if (delete && dbDatum != null && dbDatum.getStatus() ==
CrawlDatum.STATUS_DB_DUPLICATE) {
- context.getCounter(NutchMetrics.GROUP_INDEXER,
- NutchMetrics.INDEXER_DELETED_DUPLICATES_TOTAL).increment(1);
+ deletedDuplicatesCounter.increment(1);
context.write(key, DELETE_ACTION);
return;
}
// Whether to skip DB_NOTMODIFIED pages
if (skip && dbDatum != null && dbDatum.getStatus() ==
CrawlDatum.STATUS_DB_NOTMODIFIED) {
- context.getCounter(NutchMetrics.GROUP_INDEXER,
- NutchMetrics.INDEXER_SKIPPED_NOT_MODIFIED_TOTAL).increment(1);
+ skippedNotModifiedCounter.increment(1);
return;
}
@@ -379,8 +416,7 @@ public class IndexerMapReduce extends Configured {
boost = scfilters.indexerScore(key, doc, dbDatum, fetchDatum, parse,
inlinks, boost);
} catch (final ScoringFilterException e) {
- context.getCounter(NutchMetrics.GROUP_INDEXER,
- NutchMetrics.INDEXER_ERRORS_SCORING_FILTER_TOTAL).increment(1);
+ errorsScoringFilterCounter.increment(1);
LOG.warn("Error calculating score {}: {}", key, e);
return;
}
@@ -415,8 +451,7 @@ public class IndexerMapReduce extends Configured {
doc = filters.filter(doc, parse, key, fetchDatum, inlinks);
} catch (final IndexingException e) {
LOG.warn("Error indexing {}: ", key, e);
- context.getCounter(NutchMetrics.GROUP_INDEXER,
- NutchMetrics.INDEXER_ERRORS_INDEXING_FILTER_TOTAL).increment(1);
+ errorsIndexingFilterCounter.increment(1);
return;
}
@@ -426,11 +461,9 @@ public class IndexerMapReduce extends Configured {
if (deleteSkippedByIndexingFilter) {
NutchIndexAction action = new NutchIndexAction(null,
NutchIndexAction.DELETE);
context.write(key, action);
- context.getCounter(NutchMetrics.GROUP_INDEXER,
-
NutchMetrics.INDEXER_DELETED_BY_INDEXING_FILTER_TOTAL).increment(1);
+ deletedByIndexingFilterCounter.increment(1);
} else {
- context.getCounter(NutchMetrics.GROUP_INDEXER,
-
NutchMetrics.INDEXER_SKIPPED_BY_INDEXING_FILTER_TOTAL).increment(1);
+ skippedByIndexingFilterCounter.increment(1);
}
return;
}
@@ -453,8 +486,7 @@ public class IndexerMapReduce extends Configured {
// Record indexing latency
indexLatencyTracker.record(System.currentTimeMillis() - indexStart);
- context.getCounter(NutchMetrics.GROUP_INDEXER,
- NutchMetrics.INDEXER_INDEXED_TOTAL).increment(1);
+ indexedCounter.increment(1);
NutchIndexAction action = new NutchIndexAction(doc,
NutchIndexAction.ADD);
context.write(key, action);
diff --git a/src/java/org/apache/nutch/tools/warc/WARCExporter.java
b/src/java/org/apache/nutch/tools/warc/WARCExporter.java
index df4f6af05..96e8c5a97 100644
--- a/src/java/org/apache/nutch/tools/warc/WARCExporter.java
+++ b/src/java/org/apache/nutch/tools/warc/WARCExporter.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
@@ -112,6 +113,31 @@ public class WARCExporter extends Configured implements
Tool {
// Metadata to JSON
Gson gson = new Gson();
+ // Cached counter references to avoid repeated lookups in hot paths
+ private Counter missingContentCounter;
+ private Counter missingMetadataCounter;
+ private Counter omittedEmptyResponseCounter;
+ private Counter invalidUriCounter;
+ private Counter recordsGeneratedCounter;
+ private Counter exceptionCounter;
+
+ @Override
+ public void setup(Context context) {
+ // Initialize cached counter references
+ missingContentCounter = context.getCounter(
+ NutchMetrics.GROUP_WARC_EXPORTER,
NutchMetrics.WARC_MISSING_CONTENT_TOTAL);
+ missingMetadataCounter = context.getCounter(
+ NutchMetrics.GROUP_WARC_EXPORTER,
NutchMetrics.WARC_MISSING_METADATA_TOTAL);
+ omittedEmptyResponseCounter = context.getCounter(
+ NutchMetrics.GROUP_WARC_EXPORTER,
NutchMetrics.WARC_OMITTED_EMPTY_RESPONSE_TOTAL);
+ invalidUriCounter = context.getCounter(
+ NutchMetrics.GROUP_WARC_EXPORTER,
NutchMetrics.WARC_INVALID_URI_TOTAL);
+ recordsGeneratedCounter = context.getCounter(
+ NutchMetrics.GROUP_WARC_EXPORTER,
NutchMetrics.WARC_RECORDS_GENERATED_TOTAL);
+ exceptionCounter = context.getCounter(
+ NutchMetrics.GROUP_WARC_EXPORTER,
NutchMetrics.WARC_EXCEPTION_TOTAL);
+ }
+
@Override
public void reduce(Text key, Iterable<NutchWritable> values,
Context context) throws IOException, InterruptedException {
@@ -148,15 +174,13 @@ public class WARCExporter extends Configured implements
Tool {
// check that we have everything we need
if (content == null) {
LOG.info("Missing content for {}", key);
- context.getCounter(NutchMetrics.GROUP_WARC_EXPORTER,
- NutchMetrics.WARC_MISSING_CONTENT_TOTAL).increment(1);
+ missingContentCounter.increment(1);
return;
}
if (cd == null) {
LOG.info("Missing fetch datum for {}", key);
- context.getCounter(NutchMetrics.GROUP_WARC_EXPORTER,
- NutchMetrics.WARC_MISSING_METADATA_TOTAL).increment(1);
+ missingMetadataCounter.increment(1);
return;
}
@@ -164,8 +188,7 @@ public class WARCExporter extends Configured implements
Tool {
// Empty responses is everything that was not a regular response
if (!(cd.getStatus() == CrawlDatum.STATUS_FETCH_SUCCESS
|| cd.getStatus() == CrawlDatum.STATUS_FETCH_NOTMODIFIED)) {
- context.getCounter(NutchMetrics.GROUP_WARC_EXPORTER,
- NutchMetrics.WARC_OMITTED_EMPTY_RESPONSE_TOTAL).increment(1);
+ omittedEmptyResponseCounter.increment(1);
return;
}
}
@@ -240,8 +263,7 @@ public class WARCExporter extends Configured implements
Tool {
.append(uri.toASCIIString()).append(CRLF);
} catch (Exception e) {
LOG.error("Invalid URI {} ", key);
- context.getCounter(NutchMetrics.GROUP_WARC_EXPORTER,
- NutchMetrics.WARC_INVALID_URI_TOTAL).increment(1);
+ invalidUriCounter.increment(1);
return;
}
@@ -273,14 +295,12 @@ public class WARCExporter extends Configured implements
Tool {
new ByteArrayInputStream(bos.toByteArray()));
WARCRecord record = new WARCRecord(in);
context.write(NullWritable.get(), new WARCWritable(record));
- context.getCounter(NutchMetrics.GROUP_WARC_EXPORTER,
- NutchMetrics.WARC_RECORDS_GENERATED_TOTAL).increment(1);
+ recordsGeneratedCounter.increment(1);
} catch (IOException | IllegalStateException exception) {
LOG.error(
"Exception when generating WARC resource record for {} : {}",
key,
exception.getMessage());
- context.getCounter(NutchMetrics.GROUP_WARC_EXPORTER,
- NutchMetrics.WARC_EXCEPTION_TOTAL).increment(1);
+ exceptionCounter.increment(1);
}
// Do we need to emit a metadata record too?
@@ -322,8 +342,7 @@ public class WARCExporter extends Configured implements
Tool {
.append(uri.toASCIIString()).append(CRLF);
} catch (Exception e) {
LOG.error("Invalid URI {} ", key);
- context.getCounter(NutchMetrics.GROUP_WARC_EXPORTER,
- NutchMetrics.WARC_INVALID_URI_TOTAL).increment(1);
+ invalidUriCounter.increment(1);
return;
}
@@ -339,14 +358,12 @@ public class WARCExporter extends Configured implements
Tool {
new ByteArrayInputStream(bos.toByteArray()));
WARCRecord record = new WARCRecord(in);
context.write(NullWritable.get(), new WARCWritable(record));
- context.getCounter(NutchMetrics.GROUP_WARC_EXPORTER,
- NutchMetrics.WARC_RECORDS_GENERATED_TOTAL).increment(1);
+ recordsGeneratedCounter.increment(1);
} catch (IOException | IllegalStateException exception) {
LOG.error(
"Exception when generating WARC metadata record for {} : {}",
key, exception.getMessage(), exception);
- context.getCounter(NutchMetrics.GROUP_WARC_EXPORTER,
- NutchMetrics.WARC_EXCEPTION_TOTAL).increment(1);
+ exceptionCounter.increment(1);
}
}
@@ -384,8 +401,7 @@ public class WARCExporter extends Configured implements
Tool {
.append(uri.toASCIIString()).append(CRLF);
} catch (Exception e) {
LOG.error("Invalid URI {} ", key);
- context.getCounter(NutchMetrics.GROUP_WARC_EXPORTER,
- NutchMetrics.WARC_INVALID_URI_TOTAL).increment(1);
+ invalidUriCounter.increment(1);
return;
}
@@ -401,14 +417,12 @@ public class WARCExporter extends Configured implements
Tool {
new ByteArrayInputStream(bos.toByteArray()));
WARCRecord record = new WARCRecord(in);
context.write(NullWritable.get(), new WARCWritable(record));
- context.getCounter(NutchMetrics.GROUP_WARC_EXPORTER,
- NutchMetrics.WARC_RECORDS_GENERATED_TOTAL).increment(1);
+ recordsGeneratedCounter.increment(1);
} catch (IOException | IllegalStateException exception) {
LOG.error(
"Exception when generating WARC metadata record for {} : {}",
key, exception.getMessage(), exception);
- context.getCounter(NutchMetrics.GROUP_WARC_EXPORTER,
- NutchMetrics.WARC_EXCEPTION_TOTAL).increment(1);
+ exceptionCounter.increment(1);
}
}
}
diff --git a/src/java/org/apache/nutch/util/SitemapProcessor.java
b/src/java/org/apache/nutch/util/SitemapProcessor.java
index 7055a6d86..a0378ec63 100644
--- a/src/java/org/apache/nutch/util/SitemapProcessor.java
+++ b/src/java/org/apache/nutch/util/SitemapProcessor.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
@@ -114,6 +115,13 @@ public class SitemapProcessor extends Configured
implements Tool {
private CrawlDatum datum = new CrawlDatum();
private SiteMapParser parser = null;
+ // Cached counter references to avoid repeated lookups in hot paths
+ private Counter filteredRecordsCounter;
+ private Counter seedsCounter;
+ private Counter fromHostnameCounter;
+ private Counter filteredFromHostnameCounter;
+ private Counter failedFetchesCounter;
+
@Override
public void setup(Context context) {
Configuration conf = context.getConfiguration();
@@ -139,6 +147,18 @@ public class SitemapProcessor extends Configured
implements Tool {
if (normalize) {
normalizers = new URLNormalizers(conf, URLNormalizers.SCOPE_DEFAULT);
}
+
+ // Initialize cached counter references
+ filteredRecordsCounter = context.getCounter(
+ NutchMetrics.GROUP_SITEMAP,
NutchMetrics.SITEMAP_FILTERED_RECORDS_TOTAL);
+ seedsCounter = context.getCounter(
+ NutchMetrics.GROUP_SITEMAP, NutchMetrics.SITEMAP_SEEDS_TOTAL);
+ fromHostnameCounter = context.getCounter(
+ NutchMetrics.GROUP_SITEMAP,
NutchMetrics.SITEMAP_FROM_HOSTNAME_TOTAL);
+ filteredFromHostnameCounter = context.getCounter(
+ NutchMetrics.GROUP_SITEMAP,
NutchMetrics.SITEMAP_FILTERED_FROM_HOSTNAME_TOTAL);
+ failedFetchesCounter = context.getCounter(
+ NutchMetrics.GROUP_SITEMAP,
NutchMetrics.SITEMAP_FAILED_FETCHES_TOTAL);
}
@Override
@@ -162,13 +182,11 @@ public class SitemapProcessor extends Configured
implements Tool {
url.startsWith("file:/")) {
// For entry from sitemap urls file, fetch the sitemap, extract
urls and emit those
if((url = filterNormalize(url)) == null) {
- context.getCounter(NutchMetrics.GROUP_SITEMAP,
- NutchMetrics.SITEMAP_FILTERED_RECORDS_TOTAL).increment(1);
+ filteredRecordsCounter.increment(1);
return;
}
- context.getCounter(NutchMetrics.GROUP_SITEMAP,
- NutchMetrics.SITEMAP_SEEDS_TOTAL).increment(1);
+ seedsCounter.increment(1);
generateSitemapUrlDatum(protocolFactory.getProtocol(url), url,
context);
} else {
LOG.info("generateSitemapsFromHostname: {}", key.toString());
@@ -206,8 +224,7 @@ public class SitemapProcessor extends Configured implements
Tool {
(url = filterNormalize("https://" + host + "/")) == null &&
(url = filterNormalize("ftp://" + host + "/")) == null &&
(url = filterNormalize("file:/" + host + "/")) == null) {
- context.getCounter(NutchMetrics.GROUP_SITEMAP,
- NutchMetrics.SITEMAP_FILTERED_RECORDS_TOTAL).increment(1);
+ filteredRecordsCounter.increment(1);
return;
}
// We may wish to use the robots.txt content as the third parameter
for .getRobotRules
@@ -218,12 +235,10 @@ public class SitemapProcessor extends Configured
implements Tool {
sitemaps.add(url + "sitemap.xml");
}
for (String sitemap : sitemaps) {
- context.getCounter(NutchMetrics.GROUP_SITEMAP,
- NutchMetrics.SITEMAP_FROM_HOSTNAME_TOTAL).increment(1);
+ fromHostnameCounter.increment(1);
sitemap = filterNormalize(sitemap);
if (sitemap == null) {
- context.getCounter(NutchMetrics.GROUP_SITEMAP,
-
NutchMetrics.SITEMAP_FILTERED_FROM_HOSTNAME_TOTAL).increment(1);
+ filteredFromHostnameCounter.increment(1);
} else {
generateSitemapUrlDatum(protocolFactory.getProtocol(sitemap),
sitemap, context);
@@ -259,8 +274,7 @@ public class SitemapProcessor extends Configured implements
Tool {
if(status.getCode() != ProtocolStatus.SUCCESS) {
// If there were any problems fetching the sitemap, log the error and
let it go. Not sure how often
// sitemaps are redirected. In future we might have to handle
redirects.
- context.getCounter(NutchMetrics.GROUP_SITEMAP,
- NutchMetrics.SITEMAP_FAILED_FETCHES_TOTAL).increment(1);
+ failedFetchesCounter.increment(1);
LOG.error("Error while fetching the sitemap. Status code: {} for {}",
status.getCode(), url);
return;
}
@@ -347,10 +361,20 @@ public class SitemapProcessor extends Configured
implements Tool {
private boolean overwriteExisting = false; // DO NOT ENABLE!!
+ // Cached counter references to avoid repeated lookups in hot paths
+ private Counter existingEntriesCounter;
+ private Counter newEntriesCounter;
+
@Override
public void setup(Context context) {
Configuration conf = context.getConfiguration();
this.overwriteExisting = conf.getBoolean(SITEMAP_OVERWRITE_EXISTING,
false);
+
+ // Initialize cached counter references
+ existingEntriesCounter = context.getCounter(
+ NutchMetrics.GROUP_SITEMAP,
NutchMetrics.SITEMAP_EXISTING_ENTRIES_TOTAL);
+ newEntriesCounter = context.getCounter(
+ NutchMetrics.GROUP_SITEMAP, NutchMetrics.SITEMAP_NEW_ENTRIES_TOTAL);
}
@Override
@@ -379,14 +403,12 @@ public class SitemapProcessor extends Configured
implements Tool {
originalDatum.setModifiedTime(sitemapDatum.getModifiedTime());
}
- context.getCounter(NutchMetrics.GROUP_SITEMAP,
- NutchMetrics.SITEMAP_EXISTING_ENTRIES_TOTAL).increment(1);
+ existingEntriesCounter.increment(1);
context.write(key, originalDatum);
}
else if(sitemapDatum != null) {
// For the newly discovered links via sitemap, set the status as
unfetched and emit
- context.getCounter(NutchMetrics.GROUP_SITEMAP,
- NutchMetrics.SITEMAP_NEW_ENTRIES_TOTAL).increment(1);
+ newEntriesCounter.increment(1);
sitemapDatum.setStatus(CrawlDatum.STATUS_DB_UNFETCHED);
context.write(key, sitemapDatum);
}
@@ -465,11 +487,11 @@ public class SitemapProcessor extends Configured
implements Tool {
FSUtils.replace(fs, current, tempCrawlDb, true);
LockUtil.removeLockFile(fs, lock);
- long filteredRecords = job.getCounters().findCounter("Sitemap",
"filtered_records").getValue();
- long fromHostname = job.getCounters().findCounter("Sitemap",
"sitemaps_from_hostname").getValue();
- long fromSeeds = job.getCounters().findCounter("Sitemap",
"sitemap_seeds").getValue();
- long failedFetches = job.getCounters().findCounter("Sitemap",
"failed_fetches").getValue();
- long newSitemapEntries = job.getCounters().findCounter("Sitemap",
"new_sitemap_entries").getValue();
+ long filteredRecords =
job.getCounters().findCounter(NutchMetrics.GROUP_SITEMAP,
NutchMetrics.SITEMAP_FILTERED_RECORDS_TOTAL).getValue();
+ long fromHostname =
job.getCounters().findCounter(NutchMetrics.GROUP_SITEMAP,
NutchMetrics.SITEMAP_FROM_HOSTNAME_TOTAL).getValue();
+ long fromSeeds =
job.getCounters().findCounter(NutchMetrics.GROUP_SITEMAP,
NutchMetrics.SITEMAP_SEEDS_TOTAL).getValue();
+ long failedFetches =
job.getCounters().findCounter(NutchMetrics.GROUP_SITEMAP,
NutchMetrics.SITEMAP_FAILED_FETCHES_TOTAL).getValue();
+ long newSitemapEntries =
job.getCounters().findCounter(NutchMetrics.GROUP_SITEMAP,
NutchMetrics.SITEMAP_NEW_ENTRIES_TOTAL).getValue();
LOG.info("SitemapProcessor: Total records rejected by filters: {}",
filteredRecords);
LOG.info("SitemapProcessor: Total sitemaps from host name: {}",
fromHostname);