This is an automated email from the ASF dual-hosted git repository.
lewismc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nutch.git
The following commit(s) were added to refs/heads/master by this push:
new ca2591e17 NUTCH-3134 Add latency metrics with percentile support to
Fetcher, Parser, and Indexer (#876)
ca2591e17 is described below
commit ca2591e1761393acc667dfe2cb575249c86179c6
Author: Lewis John McGibbney <[email protected]>
AuthorDate: Wed Dec 17 19:28:41 2025 -0800
NUTCH-3134 Add latency metrics with percentile support to Fetcher, Parser,
and Indexer (#876)
---
.../org/apache/nutch/fetcher/FetcherThread.java | 13 ++
.../org/apache/nutch/indexer/IndexerMapReduce.java | 21 +++
.../org/apache/nutch/metrics/LatencyTracker.java | 144 +++++++++++++++++++++
.../org/apache/nutch/metrics/NutchMetrics.java | 22 ++++
src/java/org/apache/nutch/parse/ParseSegment.java | 15 ++-
5 files changed, 214 insertions(+), 1 deletion(-)
diff --git a/src/java/org/apache/nutch/fetcher/FetcherThread.java
b/src/java/org/apache/nutch/fetcher/FetcherThread.java
index 8b4e5c95c..02b7cd3e8 100644
--- a/src/java/org/apache/nutch/fetcher/FetcherThread.java
+++ b/src/java/org/apache/nutch/fetcher/FetcherThread.java
@@ -38,6 +38,7 @@ import org.apache.nutch.crawl.NutchWritable;
import org.apache.nutch.crawl.SignatureFactory;
import org.apache.nutch.fetcher.Fetcher.FetcherRun;
import org.apache.nutch.fetcher.FetcherThreadEvent.PublishEventType;
+import org.apache.nutch.metrics.LatencyTracker;
import org.apache.nutch.metrics.NutchMetrics;
import org.apache.nutch.metadata.Metadata;
import org.apache.nutch.metadata.Nutch;
@@ -166,6 +167,9 @@ public class FetcherThread extends Thread {
private Counter outlinksDetectedCounter;
private Counter outlinksFollowingCounter;
+ // Latency tracker for fetch timing metrics
+ private LatencyTracker fetchLatencyTracker;
+
public FetcherThread(Configuration conf, AtomicInteger activeThreads,
FetchItemQueues fetchQueues,
QueueFeeder feeder, AtomicInteger spinWaiting, AtomicLong
lastRequestStart, FetcherRun.Context context,
AtomicInteger errors, String segmentName, boolean parsing, boolean
storingContent,
@@ -284,6 +288,10 @@ public class FetcherThread extends Thread {
NutchMetrics.GROUP_FETCHER_OUTLINKS,
NutchMetrics.FETCHER_OUTLINKS_DETECTED_TOTAL);
outlinksFollowingCounter = context.getCounter(
NutchMetrics.GROUP_FETCHER_OUTLINKS,
NutchMetrics.FETCHER_OUTLINKS_FOLLOWING_TOTAL);
+
+ // Initialize latency tracker for fetch timing
+ fetchLatencyTracker = new LatencyTracker(
+ NutchMetrics.GROUP_FETCHER, NutchMetrics.FETCHER_LATENCY);
}
@Override
@@ -417,8 +425,11 @@ public class FetcherThread extends Thread {
fit.queueID, fiq.crawlDelay, fit.url);
}
}
+ // Track fetch latency
+ long fetchStart = System.currentTimeMillis();
ProtocolOutput output = protocol.getProtocolOutput(fit.url,
fit.datum);
+ fetchLatencyTracker.record(System.currentTimeMillis() -
fetchStart);
ProtocolStatus status = output.getStatus();
Content content = output.getContent();
ParseStatus pstatus = null;
@@ -557,6 +568,8 @@ public class FetcherThread extends Thread {
if (fit != null) {
fetchQueues.finishFetchItem(fit);
}
+ // Emit fetch latency metrics
+ fetchLatencyTracker.emitCounters(context);
activeThreads.decrementAndGet(); // count threads
LOG.info("{} {} -finishing thread {}, activeThreads={}", getName(),
Thread.currentThread().getId(), getName(), activeThreads);
diff --git a/src/java/org/apache/nutch/indexer/IndexerMapReduce.java
b/src/java/org/apache/nutch/indexer/IndexerMapReduce.java
index 33f2f244a..9086a1983 100644
--- a/src/java/org/apache/nutch/indexer/IndexerMapReduce.java
+++ b/src/java/org/apache/nutch/indexer/IndexerMapReduce.java
@@ -40,6 +40,7 @@ import org.apache.nutch.crawl.CrawlDb;
import org.apache.nutch.crawl.Inlinks;
import org.apache.nutch.crawl.LinkDb;
import org.apache.nutch.crawl.NutchWritable;
+import org.apache.nutch.metrics.LatencyTracker;
import org.apache.nutch.metrics.NutchMetrics;
import org.apache.nutch.metadata.Metadata;
import org.apache.nutch.metadata.Nutch;
@@ -215,6 +216,9 @@ public class IndexerMapReduce extends Configured {
private URLNormalizers urlNormalizers;
private URLFilters urlFilters;
+ // Latency tracker for indexing timing metrics
+ private LatencyTracker indexLatencyTracker;
+
@Override
public void setup(Reducer<Text, NutchWritable, Text,
NutchIndexAction>.Context context) {
Configuration conf = context.getConfiguration();
@@ -239,6 +243,17 @@ public class IndexerMapReduce extends Configured {
if (filter) {
urlFilters = new URLFilters(conf);
}
+
+ // Initialize latency tracker for indexing timing
+ indexLatencyTracker = new LatencyTracker(
+ NutchMetrics.GROUP_INDEXER, NutchMetrics.INDEXER_LATENCY);
+ }
+
+ @Override
+ public void cleanup(Reducer<Text, NutchWritable, Text,
NutchIndexAction>.Context context)
+ throws IOException, InterruptedException {
+ // Emit indexing latency metrics
+ indexLatencyTracker.emitCounters(context);
}
@Override
@@ -343,6 +358,9 @@ public class IndexerMapReduce extends Configured {
return;
}
+ // Start timing document indexing
+ long indexStart = System.currentTimeMillis();
+
NutchDocument doc = new NutchDocument();
doc.add("id", key.toString());
@@ -432,6 +450,9 @@ public class IndexerMapReduce extends Configured {
doc.add("binaryContent", binary);
}
+ // Record indexing latency
+ indexLatencyTracker.record(System.currentTimeMillis() - indexStart);
+
context.getCounter(NutchMetrics.GROUP_INDEXER,
NutchMetrics.INDEXER_INDEXED_TOTAL).increment(1);
diff --git a/src/java/org/apache/nutch/metrics/LatencyTracker.java
b/src/java/org/apache/nutch/metrics/LatencyTracker.java
new file mode 100644
index 000000000..3777bb29e
--- /dev/null
+++ b/src/java/org/apache/nutch/metrics/LatencyTracker.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.metrics;
+
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+import com.tdunning.math.stats.TDigest;
+
+/**
+ * A utility class for tracking latency metrics using TDigest for percentile
+ * calculation.
+ *
+ * <p>This class wraps a TDigest data structure to collect latency samples and
+ * emit Hadoop counters with count, sum, and percentile values (p50, p95, p99).
+ *
+ * <p>Usage:
+ * <pre>
+ * // In mapper/reducer setup
+ * latencyTracker = new LatencyTracker(NutchMetrics.GROUP_FETCHER,
NutchMetrics.FETCHER_LATENCY);
+ *
+ * // During processing
+ * long start = System.currentTimeMillis();
+ * // ... operation ...
+ * latencyTracker.record(System.currentTimeMillis() - start);
+ *
+ * // In cleanup
+ * latencyTracker.emitCounters(context);
+ * </pre>
+ *
+ * <p>Emits the following counters:
+ * <ul>
+ * <li>{prefix}_count_total - total number of samples</li>
+ * <li>{prefix}_sum_ms - sum of all latencies in milliseconds</li>
+ * <li>{prefix}_p50_ms - 50th percentile (median) latency</li>
+ * <li>{prefix}_p95_ms - 95th percentile latency</li>
+ * <li>{prefix}_p99_ms - 99th percentile latency</li>
+ * </ul>
+ *
+ * @since 1.22
+ */
+public class LatencyTracker {
+
+ /** Default compression factor for TDigest (controls accuracy vs memory). */
+ private static final double DEFAULT_COMPRESSION = 100.0;
+
+ private final TDigest digest;
+ private final String group;
+ private final String prefix;
+ private long count = 0;
+ private long sum = 0;
+
+ /**
+ * Creates a new LatencyTracker.
+ *
+ * @param group the Hadoop counter group name
+ * @param prefix the prefix for counter names (e.g., "fetch_latency")
+ */
+ public LatencyTracker(String group, String prefix) {
+ this.digest = TDigest.createDigest(DEFAULT_COMPRESSION);
+ this.group = group;
+ this.prefix = prefix;
+ }
+
+ /**
+ * Records a latency sample.
+ *
+ * @param latencyMs the latency in milliseconds
+ */
+ public void record(long latencyMs) {
+ digest.add(latencyMs);
+ count++;
+ sum += latencyMs;
+ }
+
+ /**
+ * Returns the number of recorded samples.
+ *
+ * @return the count of recorded latency samples
+ */
+ public long getCount() {
+ return count;
+ }
+
+ /**
+ * Returns the sum of all recorded latencies.
+ *
+ * @return the sum of latencies in milliseconds
+ */
+ public long getSum() {
+ return sum;
+ }
+
+ /**
+ * Returns the percentile value for the given quantile.
+ *
+ * @param quantile the quantile (0.0 to 1.0)
+ * @return the percentile value in milliseconds
+ */
+ public long getPercentile(double quantile) {
+ if (count == 0) {
+ return 0;
+ }
+ return (long) digest.quantile(quantile);
+ }
+
+ /**
+ * Emits all latency counters to the Hadoop context.
+ *
+ * <p>Should be called once during cleanup to emit aggregated metrics.
+ *
+ * @param context the Hadoop task context
+ */
+ public void emitCounters(TaskInputOutputContext<?, ?, ?, ?> context) {
+ context.getCounter(group, prefix + "_count_total").setValue(count);
+ context.getCounter(group, prefix + "_sum_ms").setValue(sum);
+
+ if (count > 0) {
+ context.getCounter(group, prefix + "_p50_ms").setValue((long)
digest.quantile(0.50));
+ context.getCounter(group, prefix + "_p95_ms").setValue((long)
digest.quantile(0.95));
+ context.getCounter(group, prefix + "_p99_ms").setValue((long)
digest.quantile(0.99));
+ } else {
+ // Set to 0 if no samples recorded
+ context.getCounter(group, prefix + "_p50_ms").setValue(0);
+ context.getCounter(group, prefix + "_p95_ms").setValue(0);
+ context.getCounter(group, prefix + "_p99_ms").setValue(0);
+ }
+ }
+}
+
+
diff --git a/src/java/org/apache/nutch/metrics/NutchMetrics.java
b/src/java/org/apache/nutch/metrics/NutchMetrics.java
index e64a8d6d0..dea34be7f 100644
--- a/src/java/org/apache/nutch/metrics/NutchMetrics.java
+++ b/src/java/org/apache/nutch/metrics/NutchMetrics.java
@@ -367,5 +367,27 @@ public final class NutchMetrics {
/** Empty results in domain statistics. */
public static final String DOMAIN_STATS_EMPTY_RESULT_TOTAL =
"empty_result_total";
+
+ // =========================================================================
+ // Latency Metric Prefixes (used with LatencyTracker)
+ // =========================================================================
+
+ /**
+ * Prefix for fetch latency metrics.
+ * Used with {@link LatencyTracker} to emit fetch timing counters.
+ */
+ public static final String FETCHER_LATENCY = "fetch_latency";
+
+ /**
+ * Prefix for parse latency metrics.
+ * Used with {@link LatencyTracker} to emit parse timing counters.
+ */
+ public static final String PARSER_LATENCY = "parse_latency";
+
+ /**
+ * Prefix for indexer latency metrics.
+ * Used with {@link LatencyTracker} to emit indexing timing counters.
+ */
+ public static final String INDEXER_LATENCY = "index_latency";
}
diff --git a/src/java/org/apache/nutch/parse/ParseSegment.java
b/src/java/org/apache/nutch/parse/ParseSegment.java
index 5ec74ea9f..a7fbe066c 100644
--- a/src/java/org/apache/nutch/parse/ParseSegment.java
+++ b/src/java/org/apache/nutch/parse/ParseSegment.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.nutch.metadata.Metadata;
import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.metrics.LatencyTracker;
import org.apache.nutch.metrics.NutchMetrics;
import org.apache.nutch.net.protocols.Response;
import org.apache.nutch.protocol.Content;
@@ -81,12 +82,22 @@ public class ParseSegment extends NutchTool implements Tool
{
private Text newKey = new Text();
private ScoringFilters scfilters;
private boolean skipTruncated;
+ private LatencyTracker parseLatencyTracker;
@Override
public void setup(Mapper<WritableComparable<?>, Content, Text,
ParseImpl>.Context context) {
Configuration conf = context.getConfiguration();
scfilters = new ScoringFilters(conf);
skipTruncated = conf.getBoolean(SKIP_TRUNCATED, true);
+ parseLatencyTracker = new LatencyTracker(
+ NutchMetrics.GROUP_PARSER, NutchMetrics.PARSER_LATENCY);
+ }
+
+ @Override
+ public void cleanup(Mapper<WritableComparable<?>, Content, Text,
ParseImpl>.Context context)
+ throws IOException, InterruptedException {
+ // Emit parse latency metrics
+ parseLatencyTracker.emitCounters(context);
}
@Override
@@ -156,7 +167,9 @@ public class ParseSegment extends NutchTool implements Tool
{
}
long end = System.currentTimeMillis();
- LOG.info("Parsed ({}ms): {}", (end - start), url);
+ long parseTime = end - start;
+ parseLatencyTracker.record(parseTime);
+ LOG.info("Parsed ({}ms): {}", parseTime, url);
context.write(
url,