This is an automated email from the ASF dual-hosted git repository.
lewismc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nutch.git
The following commit(s) were added to refs/heads/master by this push:
new 1242e22ba NUTCH-3142 Add Error Context to Metrics (#882)
1242e22ba is described below
commit 1242e22ba44ac743de76c39a35270427c2fee1e4
Author: Lewis John McGibbney <[email protected]>
AuthorDate: Thu Feb 5 14:44:03 2026 -0800
NUTCH-3142 Add Error Context to Metrics (#882)
---
ivy/ivy.xml | 3 +
.../org/apache/nutch/crawl/CrawlDbReducer.java | 7 +
src/java/org/apache/nutch/crawl/Generator.java | 14 +-
src/java/org/apache/nutch/crawl/Injector.java | 5 +
.../org/apache/nutch/fetcher/FetcherThread.java | 28 +-
.../org/apache/nutch/hostdb/ResolverThread.java | 14 +
.../apache/nutch/hostdb/UpdateHostDbMapper.java | 9 +-
.../org/apache/nutch/indexer/IndexerMapReduce.java | 16 +-
.../org/apache/nutch/metrics/ErrorTracker.java | 383 +++++++++++++++
.../org/apache/nutch/metrics/NutchMetrics.java | 81 +++-
src/java/org/apache/nutch/parse/ParseSegment.java | 6 +
.../org/apache/nutch/tools/warc/WARCExporter.java | 22 +-
.../org/apache/nutch/util/SitemapProcessor.java | 6 +
.../org/apache/nutch/metrics/TestErrorTracker.java | 514 +++++++++++++++++++++
14 files changed, 1050 insertions(+), 58 deletions(-)
diff --git a/ivy/ivy.xml b/ivy/ivy.xml
index 44914368e..9b38d2fa9 100644
--- a/ivy/ivy.xml
+++ b/ivy/ivy.xml
@@ -122,6 +122,9 @@
<!-- Required for JUnit 5 (Jupiter) test execution -->
<dependency org="org.junit.jupiter" name="junit-jupiter-engine"
rev="5.14.1" conf="test->default"/>
<dependency org="org.junit.jupiter" name="junit-jupiter-api"
rev="5.14.1" conf="test->default"/>
+ <!-- Mockito for mocking in tests -->
+ <dependency org="org.mockito" name="mockito-core" rev="5.18.0"
conf="test->default"/>
+ <dependency org="org.mockito" name="mockito-junit-jupiter"
rev="5.18.0" conf="test->default"/>
<!-- Jetty used to serve test pages for unit tests, but is also
provided as dependency of Hadoop -->
<dependency org="org.eclipse.jetty" name="jetty-server"
rev="12.1.5" conf="test->default">
diff --git a/src/java/org/apache/nutch/crawl/CrawlDbReducer.java
b/src/java/org/apache/nutch/crawl/CrawlDbReducer.java
index e263f8463..3ba173447 100644
--- a/src/java/org/apache/nutch/crawl/CrawlDbReducer.java
+++ b/src/java/org/apache/nutch/crawl/CrawlDbReducer.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.PriorityQueue;
import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.metrics.ErrorTracker;
import org.apache.nutch.metrics.NutchMetrics;
import org.apache.nutch.scoring.ScoringFilterException;
import org.apache.nutch.scoring.ScoringFilters;
@@ -49,6 +50,7 @@ public class CrawlDbReducer extends
private boolean additionsAllowed;
private int maxInterval;
private FetchSchedule schedule;
+ private ErrorTracker errorTracker;
@Override
public void setup(Reducer<Text, CrawlDatum, Text, CrawlDatum>.Context
context) {
@@ -60,6 +62,8 @@ public class CrawlDbReducer extends
schedule = FetchScheduleFactory.getFetchSchedule(conf);
int maxLinks = conf.getInt("db.update.max.inlinks", 10000);
linked = new InlinkPriorityQueue(maxLinks);
+ // Initialize error tracker with cached counters
+ errorTracker = new ErrorTracker(NutchMetrics.GROUP_CRAWLDB, context);
}
@Override
@@ -162,6 +166,7 @@ public class CrawlDbReducer extends
scfilters.orphanedScore(key, old);
} catch (ScoringFilterException e) {
LOG.warn("Couldn't update orphaned score, key={}: {}", key, e);
+ errorTracker.incrementCounters(e);
}
context.write(key, old);
// Dynamic counter based on status name
@@ -208,6 +213,7 @@ public class CrawlDbReducer extends
} catch (ScoringFilterException e) {
LOG.warn("Cannot filter init score for url {}, using default: {}",
key, e.getMessage());
+ errorTracker.incrementCounters(e);
result.setScore(0.0f);
}
}
@@ -317,6 +323,7 @@ public class CrawlDbReducer extends
scfilters.updateDbScore(key, oldSet ? old : null, result, linkList);
} catch (Exception e) {
LOG.warn("Couldn't update score, key={}: {}", key, e);
+ errorTracker.incrementCounters(e);
}
// remove generation time, if any
result.getMetaData().remove(Nutch.WRITABLE_GENERATE_TIME_KEY);
diff --git a/src/java/org/apache/nutch/crawl/Generator.java
b/src/java/org/apache/nutch/crawl/Generator.java
index db15f0426..456ba689a 100644
--- a/src/java/org/apache/nutch/crawl/Generator.java
+++ b/src/java/org/apache/nutch/crawl/Generator.java
@@ -67,6 +67,7 @@ import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.nutch.hostdb.HostDatum;
import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.metrics.ErrorTracker;
import org.apache.nutch.metrics.NutchMetrics;
import org.apache.nutch.net.URLFilterException;
import org.apache.nutch.net.URLFilters;
@@ -191,6 +192,7 @@ public class Generator extends NutchTool implements Tool {
private int intervalThreshold = -1;
private byte restrictStatus = -1;
private JexlScript expr = null;
+ private ErrorTracker errorTracker;
@Override
public void setup(
@@ -215,6 +217,8 @@ public class Generator extends NutchTool implements Tool {
restrictStatus = CrawlDatum.getStatusByName(restrictStatusString);
}
expr = JexlUtil.parseExpression(conf.get(GENERATOR_EXPR, null));
+ // Initialize error tracker with cached counters
+ errorTracker = new ErrorTracker(NutchMetrics.GROUP_GENERATOR, context);
}
@Override
@@ -231,8 +235,7 @@ public class Generator extends NutchTool implements Tool {
return;
}
} catch (URLFilterException e) {
- context.getCounter(NutchMetrics.GROUP_GENERATOR,
- NutchMetrics.GENERATOR_URL_FILTER_EXCEPTION_TOTAL).increment(1);
+ errorTracker.incrementCounters(e);
LOG.warn("Couldn't filter url: {} ({})", url, e.getMessage());
}
}
@@ -261,6 +264,7 @@ public class Generator extends NutchTool implements Tool {
try {
sort = scfilters.generatorSortValue(key, crawlDatum, sort);
} catch (ScoringFilterException sfe) {
+ errorTracker.incrementCounters(sfe);
LOG.warn("Couldn't filter generatorSortValue for {}: {}", key, sfe);
}
@@ -326,6 +330,7 @@ public class Generator extends NutchTool implements Tool {
private JexlScript maxCountExpr = null;
private JexlScript fetchDelayExpr = null;
private Map<String, HostDatum> hostDatumCache = new HashMap<>();
+ private ErrorTracker errorTracker;
public void readHostDb() throws IOException {
if (conf.get(GENERATOR_HOSTDB) == null) {
@@ -419,6 +424,8 @@ public class Generator extends NutchTool implements Tool {
fetchDelayExpr = JexlUtil
.parseExpression(conf.get(GENERATOR_FETCH_DELAY_EXPR, null));
}
+ // Initialize error tracker with cached counters
+ errorTracker = new ErrorTracker(NutchMetrics.GROUP_GENERATOR, context);
readHostDb();
}
@@ -516,8 +523,7 @@ public class Generator extends NutchTool implements Tool {
} catch (MalformedURLException e) {
LOG.warn("Malformed URL: '{}', skipping ({})", urlString,
StringUtils.stringifyException(e));
- context.getCounter(NutchMetrics.GROUP_GENERATOR,
- NutchMetrics.GENERATOR_MALFORMED_URL_TOTAL).increment(1);
+ errorTracker.incrementCounters(e);
continue;
}
diff --git a/src/java/org/apache/nutch/crawl/Injector.java
b/src/java/org/apache/nutch/crawl/Injector.java
index 03a54f1eb..f84366c2c 100644
--- a/src/java/org/apache/nutch/crawl/Injector.java
+++ b/src/java/org/apache/nutch/crawl/Injector.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.metrics.ErrorTracker;
import org.apache.nutch.metrics.NutchMetrics;
import org.apache.nutch.net.URLFilters;
import org.apache.nutch.net.URLNormalizers;
@@ -127,6 +128,7 @@ public class Injector extends NutchTool implements Tool {
private boolean url404Purging;
private String scope;
private boolean filterNormalizeAll = false;
+ private ErrorTracker errorTracker;
@Override
public void setup(Context context) {
@@ -147,6 +149,8 @@ public class Injector extends NutchTool implements Tool {
curTime = conf.getLong("injector.current.time",
System.currentTimeMillis());
url404Purging = conf.getBoolean(CrawlDb.CRAWLDB_PURGE_404, false);
+ // Initialize error tracker with cached counters
+ errorTracker = new ErrorTracker(NutchMetrics.GROUP_INJECTOR, context);
}
/* Filter and normalize the input url */
@@ -239,6 +243,7 @@ public class Injector extends NutchTool implements Tool {
LOG.warn(
"Cannot filter injected score for url {}, using default ({})",
url, e.getMessage());
+ errorTracker.incrementCounters(e);
}
context.getCounter(NutchMetrics.GROUP_INJECTOR,
NutchMetrics.INJECTOR_URLS_INJECTED_TOTAL).increment(1);
diff --git a/src/java/org/apache/nutch/fetcher/FetcherThread.java
b/src/java/org/apache/nutch/fetcher/FetcherThread.java
index 2abcfe9f6..d367a4f29 100644
--- a/src/java/org/apache/nutch/fetcher/FetcherThread.java
+++ b/src/java/org/apache/nutch/fetcher/FetcherThread.java
@@ -38,6 +38,7 @@ import org.apache.nutch.crawl.NutchWritable;
import org.apache.nutch.crawl.SignatureFactory;
import org.apache.nutch.fetcher.Fetcher.FetcherRun;
import org.apache.nutch.fetcher.FetcherThreadEvent.PublishEventType;
+import org.apache.nutch.metrics.ErrorTracker;
import org.apache.nutch.metrics.LatencyTracker;
import org.apache.nutch.metrics.NutchMetrics;
import org.apache.nutch.metadata.Metadata;
@@ -170,6 +171,9 @@ public class FetcherThread extends Thread {
// Latency tracker for fetch timing metrics
private LatencyTracker fetchLatencyTracker;
+ // Error tracker for categorized error metrics
+ private ErrorTracker errorTracker;
+
public FetcherThread(Configuration conf, AtomicInteger activeThreads,
FetchItemQueues fetchQueues,
QueueFeeder feeder, AtomicInteger spinWaiting, AtomicLong
lastRequestStart, FetcherRun.Context context,
AtomicInteger errors, String segmentName, boolean parsing, boolean
storingContent,
@@ -292,6 +296,9 @@ public class FetcherThread extends Thread {
// Initialize latency tracker for fetch timing
fetchLatencyTracker = new LatencyTracker(
NutchMetrics.GROUP_FETCHER, NutchMetrics.FETCHER_LATENCY);
+
+ // Initialize error tracker for categorized error metrics
+ errorTracker = new ErrorTracker(NutchMetrics.GROUP_FETCHER);
}
@Override
@@ -548,15 +555,7 @@ public class FetcherThread extends Thread {
} catch (Throwable t) { // unexpected exception
// unblock
fetchQueues.finishFetchItem(fit);
- String message;
- if (LOG.isDebugEnabled()) {
- message = StringUtils.stringifyException(t);
- } else if (logUtil.logShort(t)) {
- message = t.getClass().getName();
- } else {
- message = StringUtils.stringifyException(t);
- }
- logError(fit.url, message);
+ logError(fit.url, t);
output(fit.url, fit.datum, null, ProtocolStatus.STATUS_FAILED,
CrawlDatum.STATUS_FETCH_RETRY);
}
@@ -570,6 +569,8 @@ public class FetcherThread extends Thread {
}
// Emit fetch latency metrics
fetchLatencyTracker.emitCounters(context);
+ // Emit error metrics
+ errorTracker.emitCounters(context);
activeThreads.decrementAndGet(); // count threads
LOG.info("{} {} -finishing thread {}, activeThreads={}", getName(),
Thread.currentThread().getId(), getName(), activeThreads);
@@ -689,10 +690,19 @@ public class FetcherThread extends Thread {
return fit;
}
+ private void logError(Text url, Throwable t) {
+ String message = t.getClass().getName() + ": " + t.getMessage();
+ LOG.info("{} {} fetch of {} failed with: {}", getName(),
+ Thread.currentThread().getId(), url, message);
+ errors.incrementAndGet();
+ errorTracker.recordError(t);
+ }
+
private void logError(Text url, String message) {
LOG.info("{} {} fetch of {} failed with: {}", getName(),
Thread.currentThread().getId(), url, message);
errors.incrementAndGet();
+ errorTracker.recordError(ErrorTracker.ErrorType.OTHER);
}
private ParseStatus output(Text key, CrawlDatum datum, Content content,
diff --git a/src/java/org/apache/nutch/hostdb/ResolverThread.java
b/src/java/org/apache/nutch/hostdb/ResolverThread.java
index 2690a73fa..4c42c02b4 100644
--- a/src/java/org/apache/nutch/hostdb/ResolverThread.java
+++ b/src/java/org/apache/nutch/hostdb/ResolverThread.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.util.StringUtils;
+import org.apache.nutch.metrics.ErrorTracker;
import org.apache.nutch.metrics.NutchMetrics;
import org.slf4j.Logger;
@@ -124,11 +125,24 @@ public class ResolverThread implements Runnable {
// Dynamic counter based on failure count - can't cache
context.getCounter(NutchMetrics.GROUP_HOSTDB,
createFailureCounterLabel(datum)).increment(1);
+ // Common error counters for consistency
+ context.getCounter(NutchMetrics.GROUP_HOSTDB,
+ NutchMetrics.ERROR_TOTAL).increment(1);
+ context.getCounter(NutchMetrics.GROUP_HOSTDB,
+ NutchMetrics.ERROR_NETWORK_TOTAL).increment(1);
} catch (Exception ioe) {
LOG.warn(StringUtils.stringifyException(ioe));
+ context.getCounter(NutchMetrics.GROUP_HOSTDB,
+ NutchMetrics.ERROR_TOTAL).increment(1);
+ context.getCounter(NutchMetrics.GROUP_HOSTDB,
+ ErrorTracker.getCounterName(ioe)).increment(1);
}
} catch (Exception e) {
LOG.warn(StringUtils.stringifyException(e));
+ context.getCounter(NutchMetrics.GROUP_HOSTDB,
+ NutchMetrics.ERROR_TOTAL).increment(1);
+ context.getCounter(NutchMetrics.GROUP_HOSTDB,
+ ErrorTracker.getCounterName(e)).increment(1);
}
context.getCounter(NutchMetrics.GROUP_HOSTDB,
diff --git a/src/java/org/apache/nutch/hostdb/UpdateHostDbMapper.java
b/src/java/org/apache/nutch/hostdb/UpdateHostDbMapper.java
index 8de2dcdf2..10a08d55a 100644
--- a/src/java/org/apache/nutch/hostdb/UpdateHostDbMapper.java
+++ b/src/java/org/apache/nutch/hostdb/UpdateHostDbMapper.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.nutch.crawl.CrawlDatum;
import org.apache.nutch.crawl.NutchWritable;
import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.metrics.ErrorTracker;
import org.apache.nutch.metrics.NutchMetrics;
import org.apache.nutch.net.URLFilters;
import org.apache.nutch.net.URLNormalizers;
@@ -63,8 +64,8 @@ public class UpdateHostDbMapper
protected URLNormalizers normalizers = null;
// Cached counter references to avoid repeated lookups in hot paths
- protected Counter malformedUrlCounter;
protected Counter filteredRecordsCounter;
+ protected ErrorTracker errorTracker;
@Override
public void setup(Mapper<Text, Writable, Text, NutchWritable>.Context
context) {
@@ -79,10 +80,10 @@ public class UpdateHostDbMapper
normalizers = new URLNormalizers(conf, URLNormalizers.SCOPE_DEFAULT);
// Initialize cached counter references
- malformedUrlCounter = context.getCounter(
- NutchMetrics.GROUP_HOSTDB, NutchMetrics.HOSTDB_MALFORMED_URL_TOTAL);
filteredRecordsCounter = context.getCounter(
NutchMetrics.GROUP_HOSTDB, NutchMetrics.HOSTDB_FILTERED_RECORDS_TOTAL);
+ // Initialize error tracker with cached counters
+ errorTracker = new ErrorTracker(NutchMetrics.GROUP_HOSTDB, context);
}
/**
@@ -148,7 +149,7 @@ public class UpdateHostDbMapper
try {
url = new URL(keyStr);
} catch (MalformedURLException e) {
- malformedUrlCounter.increment(1);
+ errorTracker.incrementCounters(e);
return;
}
String hostName = URLUtil.getHost(url);
diff --git a/src/java/org/apache/nutch/indexer/IndexerMapReduce.java
b/src/java/org/apache/nutch/indexer/IndexerMapReduce.java
index b61a7f99c..50da12b8a 100644
--- a/src/java/org/apache/nutch/indexer/IndexerMapReduce.java
+++ b/src/java/org/apache/nutch/indexer/IndexerMapReduce.java
@@ -41,6 +41,7 @@ import org.apache.nutch.crawl.CrawlDb;
import org.apache.nutch.crawl.Inlinks;
import org.apache.nutch.crawl.LinkDb;
import org.apache.nutch.crawl.NutchWritable;
+import org.apache.nutch.metrics.ErrorTracker;
import org.apache.nutch.metrics.LatencyTracker;
import org.apache.nutch.metrics.NutchMetrics;
import org.apache.nutch.metadata.Metadata;
@@ -226,11 +227,12 @@ public class IndexerMapReduce extends Configured {
private Counter deletedRedirectsCounter;
private Counter deletedDuplicatesCounter;
private Counter skippedNotModifiedCounter;
- private Counter errorsScoringFilterCounter;
- private Counter errorsIndexingFilterCounter;
private Counter deletedByIndexingFilterCounter;
private Counter skippedByIndexingFilterCounter;
private Counter indexedCounter;
+
+ // Error tracker with cached counters
+ private ErrorTracker errorTracker;
@Override
public void setup(Reducer<Text, NutchWritable, Text,
NutchIndexAction>.Context context) {
@@ -279,16 +281,14 @@ public class IndexerMapReduce extends Configured {
NutchMetrics.GROUP_INDEXER,
NutchMetrics.INDEXER_DELETED_DUPLICATES_TOTAL);
skippedNotModifiedCounter = context.getCounter(
NutchMetrics.GROUP_INDEXER,
NutchMetrics.INDEXER_SKIPPED_NOT_MODIFIED_TOTAL);
- errorsScoringFilterCounter = context.getCounter(
- NutchMetrics.GROUP_INDEXER,
NutchMetrics.INDEXER_ERRORS_SCORING_FILTER_TOTAL);
- errorsIndexingFilterCounter = context.getCounter(
- NutchMetrics.GROUP_INDEXER,
NutchMetrics.INDEXER_ERRORS_INDEXING_FILTER_TOTAL);
deletedByIndexingFilterCounter = context.getCounter(
NutchMetrics.GROUP_INDEXER,
NutchMetrics.INDEXER_DELETED_BY_INDEXING_FILTER_TOTAL);
skippedByIndexingFilterCounter = context.getCounter(
NutchMetrics.GROUP_INDEXER,
NutchMetrics.INDEXER_SKIPPED_BY_INDEXING_FILTER_TOTAL);
indexedCounter = context.getCounter(
NutchMetrics.GROUP_INDEXER, NutchMetrics.INDEXER_INDEXED_TOTAL);
+ // Initialize error tracker with cached counters
+ errorTracker = new ErrorTracker(NutchMetrics.GROUP_INDEXER, context);
}
@Override
@@ -416,7 +416,7 @@ public class IndexerMapReduce extends Configured {
boost = scfilters.indexerScore(key, doc, dbDatum, fetchDatum, parse,
inlinks, boost);
} catch (final ScoringFilterException e) {
- errorsScoringFilterCounter.increment(1);
+ errorTracker.incrementCounters(e);
LOG.warn("Error calculating score {}: {}", key, e);
return;
}
@@ -451,7 +451,7 @@ public class IndexerMapReduce extends Configured {
doc = filters.filter(doc, parse, key, fetchDatum, inlinks);
} catch (final IndexingException e) {
LOG.warn("Error indexing {}: ", key, e);
- errorsIndexingFilterCounter.increment(1);
+ errorTracker.incrementCounters(e);
return;
}
diff --git a/src/java/org/apache/nutch/metrics/ErrorTracker.java
b/src/java/org/apache/nutch/metrics/ErrorTracker.java
new file mode 100644
index 000000000..192107160
--- /dev/null
+++ b/src/java/org/apache/nutch/metrics/ErrorTracker.java
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.metrics;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.net.UnknownHostException;
+import java.util.EnumMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+/**
+ * A utility class for tracking errors by category with automatic
classification.
+ *
+ * <p>This class provides thread-safe error counting with automatic
categorization
+ * based on exception type. It uses a bounded set of error categories to stay
within
+ * Hadoop's counter limits (~120 counters).
+ *
+ * <p>Usage:
+ * <pre>
+ * // In mapper/reducer setup or thread initialization
+ * errorTracker = new ErrorTracker(NutchMetrics.GROUP_FETCHER);
+ *
+ * // When catching exceptions
+ * try {
+ * // ... operation ...
+ * } catch (Exception e) {
+ * errorTracker.recordError(e); // Auto-categorizes
+ * }
+ *
+ * // Or with manual categorization
+ * errorTracker.recordError(ErrorTracker.ErrorType.NETWORK);
+ *
+ * // In cleanup - emit all error counters
+ * errorTracker.emitCounters(context);
+ * </pre>
+ *
+ * <p>Emits the following counters:
+ * <ul>
+ * <li>errors_total - total number of errors across all categories</li>
+ * <li>errors_network_total - network-related errors</li>
+ * <li>errors_protocol_total - protocol errors</li>
+ * <li>errors_parsing_total - parsing errors</li>
+ * <li>errors_url_total - URL-related errors</li>
+ * <li>errors_scoring_total - scoring filter errors</li>
+ * <li>errors_indexing_total - indexing filter errors</li>
+ * <li>errors_timeout_total - timeout errors</li>
+ * <li>errors_other_total - uncategorized errors</li>
+ * </ul>
+ *
+ * @since 1.22
+ */
+public class ErrorTracker {
+
+ /**
+ * Error type categories for classification.
+ * Uses a bounded set to stay within Hadoop's counter limits.
+ */
+ public enum ErrorType {
+ /** Network-related errors (IOException, SocketException, etc.) */
+ NETWORK,
+ /** Protocol errors (ProtocolException, ProtocolNotFound) */
+ PROTOCOL,
+ /** Parsing errors (ParseException, ParserNotFound) */
+ PARSING,
+ /** URL-related errors (MalformedURLException, URLFilterException) */
+ URL,
+ /** Scoring filter errors */
+ SCORING,
+ /** Indexing filter errors */
+ INDEXING,
+ /** Timeout errors (SocketTimeoutException) */
+ TIMEOUT,
+ /** Other uncategorized errors */
+ OTHER
+ }
+
+ private final String group;
+ private final Map<ErrorType, AtomicLong> counts;
+ private final AtomicLong totalCount;
+
+ // Cached counter references for performance (optional - set via
initCounters)
+ private org.apache.hadoop.mapreduce.Counter cachedTotalCounter;
+ private final Map<ErrorType, org.apache.hadoop.mapreduce.Counter>
cachedCounters;
+
+ /**
+ * Creates a new ErrorTracker for the specified counter group.
+ *
+ * <p>This constructor creates an ErrorTracker without cached counters.
+ * Call {@link #initCounters(TaskInputOutputContext)} in setup() to cache
+ * counter references for better performance.
+ *
+ * @param group the Hadoop counter group name (e.g.,
NutchMetrics.GROUP_FETCHER)
+ */
+ public ErrorTracker(String group) {
+ this.group = group;
+ this.counts = new EnumMap<>(ErrorType.class);
+ this.cachedCounters = new EnumMap<>(ErrorType.class);
+ this.totalCount = new AtomicLong(0);
+
+ // Initialize all counts to 0
+ for (ErrorType type : ErrorType.values()) {
+ counts.put(type, new AtomicLong(0));
+ }
+ }
+
+ /**
+ * Creates a new ErrorTracker with cached counter references.
+ *
+ * <p>This constructor caches all counter references at creation time,
+ * avoiding repeated counter lookups in hot paths.
+ *
+ * @param group the Hadoop counter group name
+ * @param context the Hadoop task context for caching counters
+ */
+ public ErrorTracker(String group, TaskInputOutputContext<?, ?, ?, ?>
context) {
+ this(group);
+ initCounters(context);
+ }
+
+ /**
+ * Initializes cached counter references from the Hadoop context.
+ *
+ * <p>Call this method in the mapper/reducer setup() method to cache
+ * counter references and avoid repeated lookups during processing.
+ *
+ * @param context the Hadoop task context
+ */
+ public void initCounters(TaskInputOutputContext<?, ?, ?, ?> context) {
+ cachedTotalCounter = context.getCounter(group, NutchMetrics.ERROR_TOTAL);
+ for (ErrorType type : ErrorType.values()) {
+ cachedCounters.put(type, context.getCounter(group,
getCounterName(type)));
+ }
+ }
+
+ /**
+ * Records an error with automatic categorization based on the throwable
type.
+ *
+ * @param t the throwable to categorize and record
+ */
+ public void recordError(Throwable t) {
+ recordError(categorize(t));
+ }
+
+ /**
+ * Records an error with explicit category.
+ *
+ * @param type the error type category
+ */
+ public void recordError(ErrorType type) {
+ counts.get(type).incrementAndGet();
+ totalCount.incrementAndGet();
+ }
+
+ /**
+ * Returns the count for a specific error type.
+ *
+ * @param type the error type
+ * @return the count for that error type
+ */
+ public long getCount(ErrorType type) {
+ return counts.get(type).get();
+ }
+
+ /**
+ * Returns the total count of all errors.
+ *
+ * @return the total error count
+ */
+ public long getTotalCount() {
+ return totalCount.get();
+ }
+
+ /**
+ * Emits all error counters to the Hadoop context.
+ *
+ * <p>Should be called once during cleanup to emit aggregated metrics.
+ * Only emits counters for error types that have non-zero counts.
+ *
+ * <p>If counters were cached via {@link
#initCounters(TaskInputOutputContext)},
+ * uses the cached references for better performance.
+ *
+ * @param context the Hadoop task context
+ */
+ public void emitCounters(TaskInputOutputContext<?, ?, ?, ?> context) {
+ // Use cached counters if available, otherwise look up
+ if (cachedTotalCounter != null) {
+ cachedTotalCounter.increment(totalCount.get());
+ for (ErrorType type : ErrorType.values()) {
+ long count = counts.get(type).get();
+ if (count > 0) {
+ cachedCounters.get(type).increment(count);
+ }
+ }
+ } else {
+ // Fallback to direct lookup
+ context.getCounter(group,
NutchMetrics.ERROR_TOTAL).increment(totalCount.get());
+ for (ErrorType type : ErrorType.values()) {
+ long count = counts.get(type).get();
+ if (count > 0) {
+ context.getCounter(group, getCounterName(type)).increment(count);
+ }
+ }
+ }
+ }
+
+ /**
+ * Directly increments cached error counters without local accumulation.
+ *
+ * <p>Use this method when you want to immediately update Hadoop counters
+ * rather than accumulating locally and emitting in cleanup.
+ * Requires {@link #initCounters(TaskInputOutputContext)} to have been
called.
+ *
+ * @param t the throwable to categorize and count
+ * @throws IllegalStateException if counters have not been initialized
+ */
+ public void incrementCounters(Throwable t) {
+ incrementCounters(categorize(t));
+ }
+
+ /**
+ * Directly increments cached error counters without local accumulation.
+ *
+ * <p>Use this method when you want to immediately update Hadoop counters
+ * rather than accumulating locally and emitting in cleanup.
+ * Requires {@link #initCounters(TaskInputOutputContext)} to have been
called.
+ *
+ * @param type the error type to count
+ * @throws IllegalStateException if counters have not been initialized
+ */
+ public void incrementCounters(ErrorType type) {
+ if (cachedTotalCounter == null) {
+ throw new IllegalStateException(
+ "Counters not initialized. Call initCounters() first.");
+ }
+ cachedTotalCounter.increment(1);
+ cachedCounters.get(type).increment(1);
+ }
+
+ /**
+ * Categorizes a throwable into an error type.
+ *
+ * <p>The categorization checks the exception class hierarchy to determine
+ * the most appropriate category. Timeout exceptions are checked first as
+ * they are a subclass of IOException.
+ *
+ * @param t the throwable to categorize
+ * @return the appropriate ErrorType for the throwable
+ */
+ public static ErrorType categorize(Throwable t) {
+ if (t == null) {
+ return ErrorType.OTHER;
+ }
+
+ String className = t.getClass().getName();
+
+ // Check for timeout first (before general IOException)
+ if (t instanceof SocketTimeoutException
+ || className.contains("TimeoutException")
+ || className.contains("Timeout")) {
+ return ErrorType.TIMEOUT;
+ }
+
+ // Network errors
+ if (t instanceof SocketException
+ || t instanceof UnknownHostException
+ || className.contains("ConnectException")
+ || className.contains("NoRouteToHostException")
+ || className.contains("ConnectionRefusedException")) {
+ return ErrorType.NETWORK;
+ }
+
+ // URL errors (check before general IOException since
MalformedURLException extends IOException)
+ if (t instanceof MalformedURLException
+ || className.contains("URLFilterException")
+ || className.contains("URISyntaxException")) {
+ return ErrorType.URL;
+ }
+
+ // General IOException (but not the specific subtypes above)
+ if (t instanceof IOException) {
+ return ErrorType.NETWORK;
+ }
+
+ // Protocol errors
+ if (className.contains("ProtocolException")
+ || className.contains("ProtocolNotFound")) {
+ return ErrorType.PROTOCOL;
+ }
+
+ // Parsing errors
+ if (className.contains("ParseException")
+ || className.contains("ParserNotFound")
+ || className.contains("SAXException")
+ || className.contains("ParserConfigurationException")) {
+ return ErrorType.PARSING;
+ }
+
+ // Scoring errors
+ if (className.contains("ScoringFilterException")) {
+ return ErrorType.SCORING;
+ }
+
+ // Indexing errors
+ if (className.contains("IndexingException")) {
+ return ErrorType.INDEXING;
+ }
+
+ // Check cause chain for more specific categorization
+ Throwable cause = t.getCause();
+ if (cause != null && cause != t) {
+ ErrorType causeType = categorize(cause);
+ if (causeType != ErrorType.OTHER) {
+ return causeType;
+ }
+ }
+
+ return ErrorType.OTHER;
+ }
+
+ /**
+ * Gets the counter name constant for a given error type.
+ *
+ * @param type the error type
+ * @return the counter name constant from NutchMetrics
+ */
+ public static String getCounterName(ErrorType type) {
+ switch (type) {
+ case NETWORK:
+ return NutchMetrics.ERROR_NETWORK_TOTAL;
+ case PROTOCOL:
+ return NutchMetrics.ERROR_PROTOCOL_TOTAL;
+ case PARSING:
+ return NutchMetrics.ERROR_PARSING_TOTAL;
+ case URL:
+ return NutchMetrics.ERROR_URL_TOTAL;
+ case SCORING:
+ return NutchMetrics.ERROR_SCORING_TOTAL;
+ case INDEXING:
+ return NutchMetrics.ERROR_INDEXING_TOTAL;
+ case TIMEOUT:
+ return NutchMetrics.ERROR_TIMEOUT_TOTAL;
+ case OTHER:
+ default:
+ return NutchMetrics.ERROR_OTHER_TOTAL;
+ }
+ }
+
+ /**
+ * Gets the counter name for a throwable based on its categorization.
+ *
+ * <p>This is a convenience method for direct use in catch blocks:
+ * <pre>
+ * } catch (Exception e) {
+ * context.getCounter(group,
ErrorTracker.getCounterName(e)).increment(1);
+ * }
+ * </pre>
+ *
+ * @param t the throwable to get the counter name for
+ * @return the counter name constant from NutchMetrics
+ */
+ public static String getCounterName(Throwable t) {
+ return getCounterName(categorize(t));
+ }
+}
diff --git a/src/java/org/apache/nutch/metrics/NutchMetrics.java
b/src/java/org/apache/nutch/metrics/NutchMetrics.java
index dea34be7f..c65a4f0ce 100644
--- a/src/java/org/apache/nutch/metrics/NutchMetrics.java
+++ b/src/java/org/apache/nutch/metrics/NutchMetrics.java
@@ -145,9 +145,6 @@ public final class NutchMetrics {
/** URLs rejected by URL filters. */
public static final String GENERATOR_URL_FILTERS_REJECTED_TOTAL =
"url_filters_rejected_total";
- /** URL filter exceptions. */
- public static final String GENERATOR_URL_FILTER_EXCEPTION_TOTAL =
"url_filter_exception_total";
-
/** URLs rejected by fetch schedule. */
public static final String GENERATOR_SCHEDULE_REJECTED_TOTAL =
"schedule_rejected_total";
@@ -166,9 +163,6 @@ public final class NutchMetrics {
/** URLs rejected due to fetch interval exceeding threshold. */
public static final String GENERATOR_INTERVAL_REJECTED_TOTAL =
"interval_rejected_total";
- /** Malformed URLs encountered. */
- public static final String GENERATOR_MALFORMED_URL_TOTAL =
"malformed_url_total";
-
/** URLs skipped due to per-host overflow. */
public static final String GENERATOR_URLS_SKIPPED_PER_HOST_OVERFLOW_TOTAL =
"urls_skipped_per_host_overflow_total";
@@ -200,12 +194,6 @@ public final class NutchMetrics {
/** Documents skipped by indexing filter. */
public static final String INDEXER_SKIPPED_BY_INDEXING_FILTER_TOTAL =
"skipped_by_indexing_filter_total";
- /** Scoring filter errors. */
- public static final String INDEXER_ERRORS_SCORING_FILTER_TOTAL =
"errors_scoring_filter_total";
-
- /** Indexing filter errors. */
- public static final String INDEXER_ERRORS_INDEXING_FILTER_TOTAL =
"errors_indexing_filter_total";
-
/** Documents indexed (added or updated). */
public static final String INDEXER_INDEXED_TOTAL = "indexed_total";
@@ -248,9 +236,6 @@ public final class NutchMetrics {
// HostDb Counters
// =========================================================================
- /** Malformed URLs in HostDb. */
- public static final String HOSTDB_MALFORMED_URL_TOTAL =
"malformed_url_total";
-
/** Records filtered in HostDb. */
public static final String HOSTDB_FILTERED_RECORDS_TOTAL =
"filtered_records_total";
@@ -346,15 +331,9 @@ public final class NutchMetrics {
/** Omitted empty responses in WARC export. */
public static final String WARC_OMITTED_EMPTY_RESPONSE_TOTAL =
"omitted_empty_response_total";
- /** Invalid URIs in WARC export. */
- public static final String WARC_INVALID_URI_TOTAL = "invalid_uri_total";
-
/** WARC records generated. */
public static final String WARC_RECORDS_GENERATED_TOTAL =
"records_generated_total";
- /** Exceptions during WARC export. */
- public static final String WARC_EXCEPTION_TOTAL = "exception_total";
-
// =========================================================================
// Domain Statistics Counters (enum-based, kept for compatibility)
// =========================================================================
@@ -389,5 +368,65 @@ public final class NutchMetrics {
* Used with {@link LatencyTracker} to emit indexing timing counters.
*/
public static final String INDEXER_LATENCY = "index_latency";
+
+ // =========================================================================
+ // Common Error Counter Names (used with component-specific groups)
+ // These constants are shared across all components for consistent error
+ // categorization. Use with ErrorTracker for automatic classification.
+ // =========================================================================
+
+ /**
+ * Total errors across all categories.
+ * This is incremented alongside any category-specific error counter.
+ */
+ public static final String ERROR_TOTAL = "errors_total";
+
+ /**
+ * Network-related errors.
+ * Includes: IOException, SocketException, ConnectException,
UnknownHostException
+ */
+ public static final String ERROR_NETWORK_TOTAL = "errors_network_total";
+
+ /**
+ * Protocol errors.
+ * Includes: ProtocolException, ProtocolNotFound
+ */
+ public static final String ERROR_PROTOCOL_TOTAL = "errors_protocol_total";
+
+ /**
+ * Parsing errors.
+ * Includes: ParseException, ParserNotFound
+ */
+ public static final String ERROR_PARSING_TOTAL = "errors_parsing_total";
+
+ /**
+ * URL-related errors.
+ * Includes: MalformedURLException, URLFilterException
+ */
+ public static final String ERROR_URL_TOTAL = "errors_url_total";
+
+ /**
+ * Scoring filter errors.
+ * Includes: ScoringFilterException
+ */
+ public static final String ERROR_SCORING_TOTAL = "errors_scoring_total";
+
+ /**
+ * Indexing filter errors.
+ * Includes: IndexingException
+ */
+ public static final String ERROR_INDEXING_TOTAL = "errors_indexing_total";
+
+ /**
+ * Timeout errors.
+ * Includes: SocketTimeoutException, connection timeouts
+ */
+ public static final String ERROR_TIMEOUT_TOTAL = "errors_timeout_total";
+
+ /**
+ * Other uncategorized errors.
+ * Used as fallback for exceptions not matching any specific category.
+ */
+ public static final String ERROR_OTHER_TOTAL = "errors_other_total";
}
diff --git a/src/java/org/apache/nutch/parse/ParseSegment.java
b/src/java/org/apache/nutch/parse/ParseSegment.java
index a7fbe066c..0b2a6f229 100644
--- a/src/java/org/apache/nutch/parse/ParseSegment.java
+++ b/src/java/org/apache/nutch/parse/ParseSegment.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.nutch.metadata.Metadata;
import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.metrics.ErrorTracker;
import org.apache.nutch.metrics.LatencyTracker;
import org.apache.nutch.metrics.NutchMetrics;
import org.apache.nutch.net.protocols.Response;
@@ -83,6 +84,7 @@ public class ParseSegment extends NutchTool implements Tool {
private ScoringFilters scfilters;
private boolean skipTruncated;
private LatencyTracker parseLatencyTracker;
+ private ErrorTracker errorTracker;
@Override
public void setup(Mapper<WritableComparable<?>, Content, Text,
ParseImpl>.Context context) {
@@ -91,6 +93,8 @@ public class ParseSegment extends NutchTool implements Tool {
skipTruncated = conf.getBoolean(SKIP_TRUNCATED, true);
parseLatencyTracker = new LatencyTracker(
NutchMetrics.GROUP_PARSER, NutchMetrics.PARSER_LATENCY);
+ // Initialize error tracker with cached counters
+ errorTracker = new ErrorTracker(NutchMetrics.GROUP_PARSER, context);
}
@Override
@@ -133,6 +137,7 @@ public class ParseSegment extends NutchTool implements Tool
{
parseResult = parseUtil.parse(content);
} catch (Exception e) {
LOG.warn("Error parsing: {}: {}", key,
StringUtils.stringifyException(e));
+ errorTracker.incrementCounters(e);
return;
}
@@ -164,6 +169,7 @@ public class ParseSegment extends NutchTool implements Tool
{
scfilters.passScoreAfterParsing(url, content, parse);
} catch (ScoringFilterException e) {
LOG.warn("Error passing score: {}: {}", url, e.getMessage());
+ errorTracker.incrementCounters(ErrorTracker.ErrorType.SCORING);
}
long end = System.currentTimeMillis();
diff --git a/src/java/org/apache/nutch/tools/warc/WARCExporter.java
b/src/java/org/apache/nutch/tools/warc/WARCExporter.java
index 96e8c5a97..f271adfe9 100644
--- a/src/java/org/apache/nutch/tools/warc/WARCExporter.java
+++ b/src/java/org/apache/nutch/tools/warc/WARCExporter.java
@@ -58,6 +58,7 @@ import org.apache.nutch.parse.ParseSegment;
import org.apache.nutch.parse.ParseText;
import org.apache.nutch.protocol.Content;
import org.apache.nutch.tools.WARCUtils;
+import org.apache.nutch.metrics.ErrorTracker;
import org.apache.nutch.metrics.NutchMetrics;
import org.apache.nutch.util.HadoopFSUtil;
import org.apache.nutch.util.NutchConfiguration;
@@ -117,9 +118,8 @@ public class WARCExporter extends Configured implements
Tool {
private Counter missingContentCounter;
private Counter missingMetadataCounter;
private Counter omittedEmptyResponseCounter;
- private Counter invalidUriCounter;
private Counter recordsGeneratedCounter;
- private Counter exceptionCounter;
+ private ErrorTracker errorTracker;
@Override
public void setup(Context context) {
@@ -130,12 +130,10 @@ public class WARCExporter extends Configured implements
Tool {
NutchMetrics.GROUP_WARC_EXPORTER,
NutchMetrics.WARC_MISSING_METADATA_TOTAL);
omittedEmptyResponseCounter = context.getCounter(
NutchMetrics.GROUP_WARC_EXPORTER,
NutchMetrics.WARC_OMITTED_EMPTY_RESPONSE_TOTAL);
- invalidUriCounter = context.getCounter(
- NutchMetrics.GROUP_WARC_EXPORTER,
NutchMetrics.WARC_INVALID_URI_TOTAL);
recordsGeneratedCounter = context.getCounter(
NutchMetrics.GROUP_WARC_EXPORTER,
NutchMetrics.WARC_RECORDS_GENERATED_TOTAL);
- exceptionCounter = context.getCounter(
- NutchMetrics.GROUP_WARC_EXPORTER,
NutchMetrics.WARC_EXCEPTION_TOTAL);
+ // Initialize error tracker with cached counters
+ errorTracker = new ErrorTracker(NutchMetrics.GROUP_WARC_EXPORTER,
context);
}
@Override
@@ -263,7 +261,7 @@ public class WARCExporter extends Configured implements
Tool {
.append(uri.toASCIIString()).append(CRLF);
} catch (Exception e) {
LOG.error("Invalid URI {} ", key);
- invalidUriCounter.increment(1);
+ errorTracker.incrementCounters(e);
return;
}
@@ -300,7 +298,7 @@ public class WARCExporter extends Configured implements
Tool {
LOG.error(
"Exception when generating WARC resource record for {} : {}",
key,
exception.getMessage());
- exceptionCounter.increment(1);
+ errorTracker.incrementCounters(exception);
}
// Do we need to emit a metadata record too?
@@ -342,7 +340,7 @@ public class WARCExporter extends Configured implements
Tool {
.append(uri.toASCIIString()).append(CRLF);
} catch (Exception e) {
LOG.error("Invalid URI {} ", key);
- invalidUriCounter.increment(1);
+ errorTracker.incrementCounters(e);
return;
}
@@ -363,7 +361,7 @@ public class WARCExporter extends Configured implements
Tool {
LOG.error(
"Exception when generating WARC metadata record for {} : {}",
key, exception.getMessage(), exception);
- exceptionCounter.increment(1);
+ errorTracker.incrementCounters(exception);
}
}
@@ -401,7 +399,7 @@ public class WARCExporter extends Configured implements
Tool {
.append(uri.toASCIIString()).append(CRLF);
} catch (Exception e) {
LOG.error("Invalid URI {} ", key);
- invalidUriCounter.increment(1);
+ errorTracker.incrementCounters(e);
return;
}
@@ -422,7 +420,7 @@ public class WARCExporter extends Configured implements
Tool {
LOG.error(
"Exception when generating WARC metadata record for {} : {}",
key, exception.getMessage(), exception);
- exceptionCounter.increment(1);
+ errorTracker.incrementCounters(exception);
}
}
}
diff --git a/src/java/org/apache/nutch/util/SitemapProcessor.java
b/src/java/org/apache/nutch/util/SitemapProcessor.java
index a0378ec63..4b55a72eb 100644
--- a/src/java/org/apache/nutch/util/SitemapProcessor.java
+++ b/src/java/org/apache/nutch/util/SitemapProcessor.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.nutch.crawl.CrawlDatum;
import org.apache.nutch.hostdb.HostDatum;
+import org.apache.nutch.metrics.ErrorTracker;
import org.apache.nutch.metrics.NutchMetrics;
import org.apache.nutch.net.URLFilters;
import org.apache.nutch.net.URLNormalizers;
@@ -121,6 +122,7 @@ public class SitemapProcessor extends Configured implements
Tool {
private Counter fromHostnameCounter;
private Counter filteredFromHostnameCounter;
private Counter failedFetchesCounter;
+ private ErrorTracker errorTracker;
@Override
public void setup(Context context) {
@@ -159,6 +161,8 @@ public class SitemapProcessor extends Configured implements
Tool {
NutchMetrics.GROUP_SITEMAP,
NutchMetrics.SITEMAP_FILTERED_FROM_HOSTNAME_TOTAL);
failedFetchesCounter = context.getCounter(
NutchMetrics.GROUP_SITEMAP,
NutchMetrics.SITEMAP_FAILED_FETCHES_TOTAL);
+ // Initialize error tracker with cached counters
+ errorTracker = new ErrorTracker(NutchMetrics.GROUP_SITEMAP, context);
}
@Override
@@ -196,6 +200,7 @@ public class SitemapProcessor extends Configured implements
Tool {
} catch (Exception e) {
LOG.warn("Exception for record {} : {}", key.toString(),
StringUtils.stringifyException(e));
+ errorTracker.incrementCounters(e);
}
}
@@ -246,6 +251,7 @@ public class SitemapProcessor extends Configured implements
Tool {
}
} catch (Exception e) {
LOG.warn("Exception for record {} : {}", host,
StringUtils.stringifyException(e));
+ errorTracker.incrementCounters(e);
}
}
diff --git a/src/test/org/apache/nutch/metrics/TestErrorTracker.java
b/src/test/org/apache/nutch/metrics/TestErrorTracker.java
new file mode 100644
index 000000000..5caa3e3a7
--- /dev/null
+++ b/src/test/org/apache/nutch/metrics/TestErrorTracker.java
@@ -0,0 +1,514 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.metrics;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.MalformedURLException;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.nutch.indexer.IndexingException;
+import org.apache.nutch.net.URLFilterException;
+import org.apache.nutch.parse.ParseException;
+import org.apache.nutch.parse.ParserNotFound;
+import org.apache.nutch.protocol.ProtocolException;
+import org.apache.nutch.protocol.ProtocolNotFound;
+import org.apache.nutch.scoring.ScoringFilterException;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.xml.sax.SAXException;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.*;
+
+import org.apache.nutch.metrics.ErrorTracker.ErrorType;
+
+/**
+ * Unit tests for {@link ErrorTracker} categorization, counting, and Hadoop
+ * counter integration.
+ */
+@ExtendWith(MockitoExtension.class)
+public class TestErrorTracker {
+
+ @Mock
+ private TaskInputOutputContext<?, ?, ?, ?> mockContext;
+
+ @Mock
+ private Counter mockCounter;
+
+ @BeforeEach
+ void setUp() {
+ // Configure mock context to return mock counter for any counter request
+ lenient().when(mockContext.getCounter(anyString(),
anyString())).thenReturn(mockCounter);
+ }
+
+ // =========================================================================
+ // Network Error Categorization Tests
+ // =========================================================================
+
+ @Test
+ public void testCategorizeNetworkErrors() {
+ // Test IOException
+ assertEquals(ErrorType.NETWORK,
+ ErrorTracker.categorize(new IOException("Connection failed")));
+
+ // Test SocketException
+ assertEquals(ErrorType.NETWORK,
+ ErrorTracker.categorize(new SocketException("Socket closed")));
+
+ // Test UnknownHostException
+ assertEquals(ErrorType.NETWORK,
+ ErrorTracker.categorize(new UnknownHostException("example.com")));
+
+ // Test ConnectException
+ assertEquals(ErrorType.NETWORK,
+ ErrorTracker.categorize(new ConnectException("Connection refused")));
+ }
+
+ // =========================================================================
+ // Timeout Error Categorization Tests
+ // =========================================================================
+
+ @Test
+ public void testCategorizeTimeoutErrors() {
+ // Test SocketTimeoutException
+ assertEquals(ErrorType.TIMEOUT,
+ ErrorTracker.categorize(new SocketTimeoutException("Read timed out")));
+ }
+
+ @Test
+ public void testCategorizeTimeoutByClassName() {
+ // Test custom exception with "Timeout" in class name
+ // The categorize method checks className.contains("Timeout")
+ Exception customTimeout = new CustomTimeoutException("Custom timeout");
+ assertEquals(ErrorType.TIMEOUT, ErrorTracker.categorize(customTimeout));
+ }
+
+ // Custom exception class for testing class name-based detection
+ private static class CustomTimeoutException extends Exception {
+ CustomTimeoutException(String message) {
+ super(message);
+ }
+ }
+
+ // =========================================================================
+ // URL Error Categorization Tests
+ // =========================================================================
+
+ @Test
+ public void testCategorizeUrlErrors() {
+ // Test MalformedURLException
+ assertEquals(ErrorType.URL,
+ ErrorTracker.categorize(new MalformedURLException("Invalid URL")));
+
+ // Test URISyntaxException
+ assertEquals(ErrorType.URL,
+ ErrorTracker.categorize(new URISyntaxException("bad uri", "Invalid
syntax")));
+ }
+
+ @Test
+ public void testCategorizeUrlFilterException() {
+ // Test URLFilterException (Nutch-specific)
+ assertEquals(ErrorType.URL,
+ ErrorTracker.categorize(new URLFilterException("URL filtered")));
+ }
+
+ // =========================================================================
+ // Protocol Error Categorization Tests
+ // =========================================================================
+
+ @Test
+ public void testCategorizeProtocolErrors() {
+ // Test ProtocolException (Nutch-specific)
+ assertEquals(ErrorType.PROTOCOL,
+ ErrorTracker.categorize(new ProtocolException("Protocol error")));
+
+ // Test ProtocolNotFound (Nutch-specific)
+ assertEquals(ErrorType.PROTOCOL,
+ ErrorTracker.categorize(new ProtocolNotFound("ftp")));
+ }
+
+ // =========================================================================
+ // Parsing Error Categorization Tests
+ // =========================================================================
+
+ @Test
+ public void testCategorizeParsingErrors() {
+ // Test ParseException (Nutch-specific)
+ assertEquals(ErrorType.PARSING,
+ ErrorTracker.categorize(new ParseException("Parse failed")));
+
+ // Test ParserNotFound (Nutch-specific)
+ assertEquals(ErrorType.PARSING,
+ ErrorTracker.categorize(new ParserNotFound("text/unknown")));
+
+ // Test SAXException
+ assertEquals(ErrorType.PARSING,
+ ErrorTracker.categorize(new SAXException("XML parse error")));
+ }
+
+ // =========================================================================
+ // Scoring Error Categorization Tests
+ // =========================================================================
+
+ @Test
+ public void testCategorizeScoringErrors() {
+ // Test ScoringFilterException (Nutch-specific)
+ assertEquals(ErrorType.SCORING,
+ ErrorTracker.categorize(new ScoringFilterException("Scoring failed")));
+ }
+
+ // =========================================================================
+ // Indexing Error Categorization Tests
+ // =========================================================================
+
+ @Test
+ public void testCategorizeIndexingErrors() {
+ // Test IndexingException (Nutch-specific)
+ assertEquals(ErrorType.INDEXING,
+ ErrorTracker.categorize(new IndexingException("Indexing failed")));
+ }
+
+ // =========================================================================
+ // Other/Fallback Categorization Tests
+ // =========================================================================
+
+ @Test
+ public void testCategorizeNullThrowable() {
+ // Null should return OTHER
+ assertEquals(ErrorType.OTHER, ErrorTracker.categorize(null));
+ }
+
+ @Test
+ public void testCategorizeGenericException() {
+ // Generic Exception should return OTHER
+ assertEquals(ErrorType.OTHER,
+ ErrorTracker.categorize(new Exception("Generic error")));
+
+ // RuntimeException should return OTHER
+ assertEquals(ErrorType.OTHER,
+ ErrorTracker.categorize(new RuntimeException("Runtime error")));
+ }
+
+ // =========================================================================
+ // Cause Chain Categorization Tests
+ // =========================================================================
+
+ @Test
+ public void testCategorizeCauseChain() {
+ // Exception with a network cause should be categorized as NETWORK
+ IOException cause = new IOException("Root cause");
+ Exception wrapper = new Exception("Wrapper", cause);
+ assertEquals(ErrorType.NETWORK, ErrorTracker.categorize(wrapper));
+
+ // Exception with a timeout cause should be categorized as TIMEOUT
+ SocketTimeoutException timeoutCause = new
SocketTimeoutException("Timeout");
+ Exception timeoutWrapper = new Exception("Wrapper", timeoutCause);
+ assertEquals(ErrorType.TIMEOUT, ErrorTracker.categorize(timeoutWrapper));
+ }
+
+ @Test
+ public void testCategorizeNestedCauseChain() {
+ // Deep nested cause chain: RuntimeException -> Exception -> IOException
+ IOException rootCause = new IOException("Root cause");
+ Exception middleWrapper = new Exception("Middle", rootCause);
+ RuntimeException outerWrapper = new RuntimeException("Outer",
middleWrapper);
+ assertEquals(ErrorType.NETWORK, ErrorTracker.categorize(outerWrapper));
+
+ // Deep nested with Nutch-specific exception
+ ScoringFilterException scoringCause = new ScoringFilterException("Scoring
error");
+ Exception wrapper1 = new Exception("Wrapper 1", scoringCause);
+ Exception wrapper2 = new Exception("Wrapper 2", wrapper1);
+ assertEquals(ErrorType.SCORING, ErrorTracker.categorize(wrapper2));
+ }
+
+ // =========================================================================
+ // Record Error Tests (Local Accumulation)
+ // =========================================================================
+
+ @Test
+ public void testRecordErrorByType() {
+ ErrorTracker tracker = new ErrorTracker(NutchMetrics.GROUP_FETCHER);
+
+ // Initially all counts should be 0
+ assertEquals(0, tracker.getTotalCount());
+ assertEquals(0, tracker.getCount(ErrorType.NETWORK));
+
+ // Record a NETWORK error
+ tracker.recordError(ErrorType.NETWORK);
+ assertEquals(1, tracker.getTotalCount());
+ assertEquals(1, tracker.getCount(ErrorType.NETWORK));
+ assertEquals(0, tracker.getCount(ErrorType.TIMEOUT));
+
+ // Record another NETWORK error
+ tracker.recordError(ErrorType.NETWORK);
+ assertEquals(2, tracker.getTotalCount());
+ assertEquals(2, tracker.getCount(ErrorType.NETWORK));
+
+ // Record a TIMEOUT error
+ tracker.recordError(ErrorType.TIMEOUT);
+ assertEquals(3, tracker.getTotalCount());
+ assertEquals(2, tracker.getCount(ErrorType.NETWORK));
+ assertEquals(1, tracker.getCount(ErrorType.TIMEOUT));
+ }
+
+ @Test
+ public void testRecordErrorByThrowable() {
+ ErrorTracker tracker = new ErrorTracker(NutchMetrics.GROUP_FETCHER);
+
+ // Record an IOException (should be categorized as NETWORK)
+ tracker.recordError(new IOException("Test"));
+ assertEquals(1, tracker.getTotalCount());
+ assertEquals(1, tracker.getCount(ErrorType.NETWORK));
+
+ // Record a SocketTimeoutException (should be categorized as TIMEOUT)
+ tracker.recordError(new SocketTimeoutException("Test"));
+ assertEquals(2, tracker.getTotalCount());
+ assertEquals(1, tracker.getCount(ErrorType.TIMEOUT));
+
+ // Record a MalformedURLException (should be categorized as URL)
+ tracker.recordError(new MalformedURLException("Test"));
+ assertEquals(3, tracker.getTotalCount());
+ assertEquals(1, tracker.getCount(ErrorType.URL));
+ }
+
+ // =========================================================================
+ // Counter Name Mapping Tests
+ // =========================================================================
+
+ @Test
+ public void testGetCounterName() {
+ // Test counter name mapping
+ assertEquals(NutchMetrics.ERROR_NETWORK_TOTAL,
+ ErrorTracker.getCounterName(ErrorType.NETWORK));
+ assertEquals(NutchMetrics.ERROR_PROTOCOL_TOTAL,
+ ErrorTracker.getCounterName(ErrorType.PROTOCOL));
+ assertEquals(NutchMetrics.ERROR_PARSING_TOTAL,
+ ErrorTracker.getCounterName(ErrorType.PARSING));
+ assertEquals(NutchMetrics.ERROR_URL_TOTAL,
+ ErrorTracker.getCounterName(ErrorType.URL));
+ assertEquals(NutchMetrics.ERROR_SCORING_TOTAL,
+ ErrorTracker.getCounterName(ErrorType.SCORING));
+ assertEquals(NutchMetrics.ERROR_INDEXING_TOTAL,
+ ErrorTracker.getCounterName(ErrorType.INDEXING));
+ assertEquals(NutchMetrics.ERROR_TIMEOUT_TOTAL,
+ ErrorTracker.getCounterName(ErrorType.TIMEOUT));
+ assertEquals(NutchMetrics.ERROR_OTHER_TOTAL,
+ ErrorTracker.getCounterName(ErrorType.OTHER));
+ }
+
+ @Test
+ public void testGetCounterNameForThrowable() {
+ // Test getting counter name directly from throwable
+ assertEquals(NutchMetrics.ERROR_NETWORK_TOTAL,
+ ErrorTracker.getCounterName(new IOException("Test")));
+ assertEquals(NutchMetrics.ERROR_TIMEOUT_TOTAL,
+ ErrorTracker.getCounterName(new SocketTimeoutException("Test")));
+ assertEquals(NutchMetrics.ERROR_URL_TOTAL,
+ ErrorTracker.getCounterName(new MalformedURLException("Test")));
+ assertEquals(NutchMetrics.ERROR_OTHER_TOTAL,
+ ErrorTracker.getCounterName(new RuntimeException("Test")));
+
+ // Test Nutch-specific exceptions
+ assertEquals(NutchMetrics.ERROR_PROTOCOL_TOTAL,
+ ErrorTracker.getCounterName(new ProtocolException("Test")));
+ assertEquals(NutchMetrics.ERROR_PARSING_TOTAL,
+ ErrorTracker.getCounterName(new ParseException("Test")));
+ assertEquals(NutchMetrics.ERROR_SCORING_TOTAL,
+ ErrorTracker.getCounterName(new ScoringFilterException("Test")));
+ assertEquals(NutchMetrics.ERROR_INDEXING_TOTAL,
+ ErrorTracker.getCounterName(new IndexingException("Test")));
+ }
+
+ // =========================================================================
+ // Hadoop Context Integration Tests (Using Mocks)
+ // =========================================================================
+
+ @Test
+ public void testConstructorWithContext() {
+ // Create ErrorTracker with context - should initialize counters
+ ErrorTracker tracker = new ErrorTracker(NutchMetrics.GROUP_FETCHER,
mockContext);
+
+ // Verify counters were requested from context
+ // Total counter + 8 error type counters = 9 calls
+ verify(mockContext, atLeast(9)).getCounter(anyString(), anyString());
+ }
+
+ @Test
+ public void testInitCounters() {
+ ErrorTracker tracker = new ErrorTracker(NutchMetrics.GROUP_FETCHER);
+
+ // Initialize counters
+ tracker.initCounters(mockContext);
+
+ // Verify counters were requested
+ verify(mockContext).getCounter(NutchMetrics.GROUP_FETCHER,
NutchMetrics.ERROR_TOTAL);
+ verify(mockContext).getCounter(NutchMetrics.GROUP_FETCHER,
NutchMetrics.ERROR_NETWORK_TOTAL);
+ verify(mockContext).getCounter(NutchMetrics.GROUP_FETCHER,
NutchMetrics.ERROR_TIMEOUT_TOTAL);
+ }
+
+ @Test
+ public void testIncrementCountersWithType() {
+ ErrorTracker tracker = new ErrorTracker(NutchMetrics.GROUP_FETCHER,
mockContext);
+
+ // Increment counters directly
+ tracker.incrementCounters(ErrorType.NETWORK);
+
+ // Verify counter was incremented (total + specific type)
+ verify(mockCounter, times(2)).increment(1);
+ }
+
+ @Test
+ public void testIncrementCountersWithThrowable() {
+ ErrorTracker tracker = new ErrorTracker(NutchMetrics.GROUP_FETCHER,
mockContext);
+
+ // Increment counters with throwable
+ tracker.incrementCounters(new IOException("Test"));
+
+ // Verify counter was incremented (total + NETWORK type)
+ verify(mockCounter, times(2)).increment(1);
+ }
+
+ @Test
+ public void testIncrementCountersWithoutInit() {
+ ErrorTracker tracker = new ErrorTracker(NutchMetrics.GROUP_FETCHER);
+
+ // Should throw IllegalStateException when counters not initialized
+ assertThrows(IllegalStateException.class, () -> {
+ tracker.incrementCounters(ErrorType.NETWORK);
+ });
+ }
+
+ @Test
+ public void testEmitCounters() {
+ ErrorTracker tracker = new ErrorTracker(NutchMetrics.GROUP_FETCHER);
+
+ // Record some errors locally
+ tracker.recordError(ErrorType.NETWORK);
+ tracker.recordError(ErrorType.NETWORK);
+ tracker.recordError(ErrorType.TIMEOUT);
+
+ // Emit counters (without cached counters - uses fallback)
+ tracker.emitCounters(mockContext);
+
+ // Verify counters were requested and incremented
+ verify(mockContext).getCounter(NutchMetrics.GROUP_FETCHER,
NutchMetrics.ERROR_TOTAL);
+ verify(mockContext).getCounter(NutchMetrics.GROUP_FETCHER,
NutchMetrics.ERROR_NETWORK_TOTAL);
+ verify(mockContext).getCounter(NutchMetrics.GROUP_FETCHER,
NutchMetrics.ERROR_TIMEOUT_TOTAL);
+ }
+
+ @Test
+ public void testEmitCountersWithCachedCounters() {
+ ErrorTracker tracker = new ErrorTracker(NutchMetrics.GROUP_FETCHER,
mockContext);
+
+ // Reset mock to clear constructor calls
+ reset(mockCounter);
+
+ // Record some errors locally
+ tracker.recordError(ErrorType.NETWORK);
+ tracker.recordError(ErrorType.NETWORK);
+ tracker.recordError(ErrorType.TIMEOUT);
+
+ // Emit counters (with cached counters)
+ tracker.emitCounters(mockContext);
+
+ // Verify cached counters were used (increment called with accumulated
values)
+ verify(mockCounter).increment(3L); // total count
+ verify(mockCounter).increment(2L); // NETWORK count
+ verify(mockCounter).increment(1L); // TIMEOUT count
+ }
+
+ // =========================================================================
+ // Thread Safety Tests
+ // =========================================================================
+
+ @Test
+ public void testThreadSafety() throws InterruptedException {
+ ErrorTracker tracker = new ErrorTracker(NutchMetrics.GROUP_FETCHER);
+
+ // Create multiple threads that record errors concurrently
+ Thread[] threads = new Thread[10];
+ for (int i = 0; i < threads.length; i++) {
+ threads[i] = new Thread(() -> {
+ for (int j = 0; j < 100; j++) {
+ tracker.recordError(ErrorType.NETWORK);
+ }
+ });
+ }
+
+ // Start all threads
+ for (Thread thread : threads) {
+ thread.start();
+ }
+
+ // Wait for all threads to complete
+ for (Thread thread : threads) {
+ thread.join();
+ }
+
+ // Verify counts
+ assertEquals(1000, tracker.getTotalCount());
+ assertEquals(1000, tracker.getCount(ErrorType.NETWORK));
+ }
+
+ @Test
+ public void testThreadSafetyMixedErrorTypes() throws InterruptedException {
+ ErrorTracker tracker = new ErrorTracker(NutchMetrics.GROUP_FETCHER);
+
+ // Create threads that record different error types concurrently
+ Thread networkThread = new Thread(() -> {
+ for (int i = 0; i < 500; i++) {
+ tracker.recordError(ErrorType.NETWORK);
+ }
+ });
+
+ Thread timeoutThread = new Thread(() -> {
+ for (int i = 0; i < 300; i++) {
+ tracker.recordError(ErrorType.TIMEOUT);
+ }
+ });
+
+ Thread urlThread = new Thread(() -> {
+ for (int i = 0; i < 200; i++) {
+ tracker.recordError(ErrorType.URL);
+ }
+ });
+
+ networkThread.start();
+ timeoutThread.start();
+ urlThread.start();
+
+ networkThread.join();
+ timeoutThread.join();
+ urlThread.join();
+
+ // Verify counts
+ assertEquals(1000, tracker.getTotalCount());
+ assertEquals(500, tracker.getCount(ErrorType.NETWORK));
+ assertEquals(300, tracker.getCount(ErrorType.TIMEOUT));
+ assertEquals(200, tracker.getCount(ErrorType.URL));
+ }
+}