This is an automated email from the ASF dual-hosted git repository.

lewismc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nutch.git


The following commit(s) were added to refs/heads/master by this push:
     new ca2591e17 NUTCH-3134 Add latency metrics with percentile support to 
Fetcher, Parser, and Indexer (#876)
ca2591e17 is described below

commit ca2591e1761393acc667dfe2cb575249c86179c6
Author: Lewis John McGibbney <[email protected]>
AuthorDate: Wed Dec 17 19:28:41 2025 -0800

    NUTCH-3134 Add latency metrics with percentile support to Fetcher, Parser, 
and Indexer (#876)
---
 .../org/apache/nutch/fetcher/FetcherThread.java    |  13 ++
 .../org/apache/nutch/indexer/IndexerMapReduce.java |  21 +++
 .../org/apache/nutch/metrics/LatencyTracker.java   | 144 +++++++++++++++++++++
 .../org/apache/nutch/metrics/NutchMetrics.java     |  22 ++++
 src/java/org/apache/nutch/parse/ParseSegment.java  |  15 ++-
 5 files changed, 214 insertions(+), 1 deletion(-)

diff --git a/src/java/org/apache/nutch/fetcher/FetcherThread.java 
b/src/java/org/apache/nutch/fetcher/FetcherThread.java
index 8b4e5c95c..02b7cd3e8 100644
--- a/src/java/org/apache/nutch/fetcher/FetcherThread.java
+++ b/src/java/org/apache/nutch/fetcher/FetcherThread.java
@@ -38,6 +38,7 @@ import org.apache.nutch.crawl.NutchWritable;
 import org.apache.nutch.crawl.SignatureFactory;
 import org.apache.nutch.fetcher.Fetcher.FetcherRun;
 import org.apache.nutch.fetcher.FetcherThreadEvent.PublishEventType;
+import org.apache.nutch.metrics.LatencyTracker;
 import org.apache.nutch.metrics.NutchMetrics;
 import org.apache.nutch.metadata.Metadata;
 import org.apache.nutch.metadata.Nutch;
@@ -166,6 +167,9 @@ public class FetcherThread extends Thread {
   private Counter outlinksDetectedCounter;
   private Counter outlinksFollowingCounter;
 
+  // Latency tracker for fetch timing metrics
+  private LatencyTracker fetchLatencyTracker;
+
   public FetcherThread(Configuration conf, AtomicInteger activeThreads, 
FetchItemQueues fetchQueues, 
       QueueFeeder feeder, AtomicInteger spinWaiting, AtomicLong 
lastRequestStart, FetcherRun.Context context,
       AtomicInteger errors, String segmentName, boolean parsing, boolean 
storingContent, 
@@ -284,6 +288,10 @@ public class FetcherThread extends Thread {
         NutchMetrics.GROUP_FETCHER_OUTLINKS, 
NutchMetrics.FETCHER_OUTLINKS_DETECTED_TOTAL);
     outlinksFollowingCounter = context.getCounter(
         NutchMetrics.GROUP_FETCHER_OUTLINKS, 
NutchMetrics.FETCHER_OUTLINKS_FOLLOWING_TOTAL);
+    
+    // Initialize latency tracker for fetch timing
+    fetchLatencyTracker = new LatencyTracker(
+        NutchMetrics.GROUP_FETCHER, NutchMetrics.FETCHER_LATENCY);
   }
 
   @Override
@@ -417,8 +425,11 @@ public class FetcherThread extends Thread {
                     fit.queueID, fiq.crawlDelay, fit.url);
               }
             }
+            // Track fetch latency
+            long fetchStart = System.currentTimeMillis();
             ProtocolOutput output = protocol.getProtocolOutput(fit.url,
                 fit.datum);
+            fetchLatencyTracker.record(System.currentTimeMillis() - 
fetchStart);
             ProtocolStatus status = output.getStatus();
             Content content = output.getContent();
             ParseStatus pstatus = null;
@@ -557,6 +568,8 @@ public class FetcherThread extends Thread {
       if (fit != null) {
         fetchQueues.finishFetchItem(fit);
       }
+      // Emit fetch latency metrics
+      fetchLatencyTracker.emitCounters(context);
       activeThreads.decrementAndGet(); // count threads
       LOG.info("{} {} -finishing thread {}, activeThreads={}", getName(),
           Thread.currentThread().getId(), getName(), activeThreads);
diff --git a/src/java/org/apache/nutch/indexer/IndexerMapReduce.java 
b/src/java/org/apache/nutch/indexer/IndexerMapReduce.java
index 33f2f244a..9086a1983 100644
--- a/src/java/org/apache/nutch/indexer/IndexerMapReduce.java
+++ b/src/java/org/apache/nutch/indexer/IndexerMapReduce.java
@@ -40,6 +40,7 @@ import org.apache.nutch.crawl.CrawlDb;
 import org.apache.nutch.crawl.Inlinks;
 import org.apache.nutch.crawl.LinkDb;
 import org.apache.nutch.crawl.NutchWritable;
+import org.apache.nutch.metrics.LatencyTracker;
 import org.apache.nutch.metrics.NutchMetrics;
 import org.apache.nutch.metadata.Metadata;
 import org.apache.nutch.metadata.Nutch;
@@ -215,6 +216,9 @@ public class IndexerMapReduce extends Configured {
     private URLNormalizers urlNormalizers;
     private URLFilters urlFilters;
 
+    // Latency tracker for indexing timing metrics
+    private LatencyTracker indexLatencyTracker;
+
     @Override
     public void setup(Reducer<Text, NutchWritable, Text, 
NutchIndexAction>.Context context) {
       Configuration conf = context.getConfiguration();
@@ -239,6 +243,17 @@ public class IndexerMapReduce extends Configured {
       if (filter) {
         urlFilters = new URLFilters(conf);
       }
+
+      // Initialize latency tracker for indexing timing
+      indexLatencyTracker = new LatencyTracker(
+          NutchMetrics.GROUP_INDEXER, NutchMetrics.INDEXER_LATENCY);
+    }
+
+    @Override
+    public void cleanup(Reducer<Text, NutchWritable, Text, 
NutchIndexAction>.Context context)
+        throws IOException, InterruptedException {
+      // Emit indexing latency metrics
+      indexLatencyTracker.emitCounters(context);
     }
 
     @Override
@@ -343,6 +358,9 @@ public class IndexerMapReduce extends Configured {
         return;
       }
 
+      // Start timing document indexing
+      long indexStart = System.currentTimeMillis();
+
       NutchDocument doc = new NutchDocument();
       doc.add("id", key.toString());
 
@@ -432,6 +450,9 @@ public class IndexerMapReduce extends Configured {
         doc.add("binaryContent", binary);
       }
 
+      // Record indexing latency
+      indexLatencyTracker.record(System.currentTimeMillis() - indexStart);
+
       context.getCounter(NutchMetrics.GROUP_INDEXER,
           NutchMetrics.INDEXER_INDEXED_TOTAL).increment(1);
 
diff --git a/src/java/org/apache/nutch/metrics/LatencyTracker.java 
b/src/java/org/apache/nutch/metrics/LatencyTracker.java
new file mode 100644
index 000000000..3777bb29e
--- /dev/null
+++ b/src/java/org/apache/nutch/metrics/LatencyTracker.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.metrics;
+
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+import com.tdunning.math.stats.TDigest;
+
+/**
+ * A utility class for tracking latency metrics using TDigest for percentile
+ * calculation.
+ * 
+ * <p>This class wraps a TDigest data structure to collect latency samples and
+ * emit Hadoop counters with count, sum, and percentile values (p50, p95, p99).
+ * 
+ * <p>Usage:
+ * <pre>
+ * // In mapper/reducer setup
+ * latencyTracker = new LatencyTracker(NutchMetrics.GROUP_FETCHER, 
NutchMetrics.FETCHER_LATENCY);
+ * 
+ * // During processing
+ * long start = System.currentTimeMillis();
+ * // ... operation ...
+ * latencyTracker.record(System.currentTimeMillis() - start);
+ * 
+ * // In cleanup
+ * latencyTracker.emitCounters(context);
+ * </pre>
+ * 
+ * <p>Emits the following counters:
+ * <ul>
+ *   <li>{prefix}_count_total - total number of samples</li>
+ *   <li>{prefix}_sum_ms - sum of all latencies in milliseconds</li>
+ *   <li>{prefix}_p50_ms - 50th percentile (median) latency</li>
+ *   <li>{prefix}_p95_ms - 95th percentile latency</li>
+ *   <li>{prefix}_p99_ms - 99th percentile latency</li>
+ * </ul>
+ * 
+ * @since 1.22
+ */
+public class LatencyTracker {
+
+  /** Default compression factor for TDigest (controls accuracy vs memory). */
+  private static final double DEFAULT_COMPRESSION = 100.0;
+
+  private final TDigest digest;
+  private final String group;
+  private final String prefix;
+  private long count = 0;
+  private long sum = 0;
+
+  /**
+   * Creates a new LatencyTracker.
+   * 
+   * @param group the Hadoop counter group name
+   * @param prefix the prefix for counter names (e.g., "fetch_latency")
+   */
+  public LatencyTracker(String group, String prefix) {
+    this.digest = TDigest.createDigest(DEFAULT_COMPRESSION);
+    this.group = group;
+    this.prefix = prefix;
+  }
+
+  /**
+   * Records a latency sample.
+   * 
+   * @param latencyMs the latency in milliseconds
+   */
+  public void record(long latencyMs) {
+    digest.add(latencyMs);
+    count++;
+    sum += latencyMs;
+  }
+
+  /**
+   * Returns the number of recorded samples.
+   * 
+   * @return the count of recorded latency samples
+   */
+  public long getCount() {
+    return count;
+  }
+
+  /**
+   * Returns the sum of all recorded latencies.
+   * 
+   * @return the sum of latencies in milliseconds
+   */
+  public long getSum() {
+    return sum;
+  }
+
+  /**
+   * Returns the percentile value for the given quantile.
+   * 
+   * @param quantile the quantile (0.0 to 1.0)
+   * @return the percentile value in milliseconds
+   */
+  public long getPercentile(double quantile) {
+    if (count == 0) {
+      return 0;
+    }
+    return (long) digest.quantile(quantile);
+  }
+
+  /**
+   * Emits all latency counters to the Hadoop context.
+   * 
+   * <p>Should be called once during cleanup to emit aggregated metrics.
+   * 
+   * @param context the Hadoop task context
+   */
+  public void emitCounters(TaskInputOutputContext<?, ?, ?, ?> context) {
+    context.getCounter(group, prefix + "_count_total").setValue(count);
+    context.getCounter(group, prefix + "_sum_ms").setValue(sum);
+    
+    if (count > 0) {
+      context.getCounter(group, prefix + "_p50_ms").setValue((long) 
digest.quantile(0.50));
+      context.getCounter(group, prefix + "_p95_ms").setValue((long) 
digest.quantile(0.95));
+      context.getCounter(group, prefix + "_p99_ms").setValue((long) 
digest.quantile(0.99));
+    } else {
+      // Set to 0 if no samples recorded
+      context.getCounter(group, prefix + "_p50_ms").setValue(0);
+      context.getCounter(group, prefix + "_p95_ms").setValue(0);
+      context.getCounter(group, prefix + "_p99_ms").setValue(0);
+    }
+  }
+}
+
+
diff --git a/src/java/org/apache/nutch/metrics/NutchMetrics.java 
b/src/java/org/apache/nutch/metrics/NutchMetrics.java
index e64a8d6d0..dea34be7f 100644
--- a/src/java/org/apache/nutch/metrics/NutchMetrics.java
+++ b/src/java/org/apache/nutch/metrics/NutchMetrics.java
@@ -367,5 +367,27 @@ public final class NutchMetrics {
 
   /** Empty results in domain statistics. */
   public static final String DOMAIN_STATS_EMPTY_RESULT_TOTAL = 
"empty_result_total";
+
+  // =========================================================================
+  // Latency Metric Prefixes (used with LatencyTracker)
+  // =========================================================================
+
+  /**
+   * Prefix for fetch latency metrics.
+   * Used with {@link LatencyTracker} to emit fetch timing counters.
+   */
+  public static final String FETCHER_LATENCY = "fetch_latency";
+
+  /**
+   * Prefix for parse latency metrics.
+   * Used with {@link LatencyTracker} to emit parse timing counters.
+   */
+  public static final String PARSER_LATENCY = "parse_latency";
+
+  /**
+   * Prefix for indexer latency metrics.
+   * Used with {@link LatencyTracker} to emit indexing timing counters.
+   */
+  public static final String INDEXER_LATENCY = "index_latency";
 }
 
diff --git a/src/java/org/apache/nutch/parse/ParseSegment.java 
b/src/java/org/apache/nutch/parse/ParseSegment.java
index 5ec74ea9f..a7fbe066c 100644
--- a/src/java/org/apache/nutch/parse/ParseSegment.java
+++ b/src/java/org/apache/nutch/parse/ParseSegment.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.nutch.metadata.Metadata;
 import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.metrics.LatencyTracker;
 import org.apache.nutch.metrics.NutchMetrics;
 import org.apache.nutch.net.protocols.Response;
 import org.apache.nutch.protocol.Content;
@@ -81,12 +82,22 @@ public class ParseSegment extends NutchTool implements Tool 
{
     private Text newKey = new Text();
     private ScoringFilters scfilters;
     private boolean skipTruncated;
+    private LatencyTracker parseLatencyTracker;
 
     @Override
     public void setup(Mapper<WritableComparable<?>, Content, Text, 
ParseImpl>.Context context) {
       Configuration conf = context.getConfiguration();
       scfilters = new ScoringFilters(conf);
       skipTruncated = conf.getBoolean(SKIP_TRUNCATED, true);
+      parseLatencyTracker = new LatencyTracker(
+          NutchMetrics.GROUP_PARSER, NutchMetrics.PARSER_LATENCY);
+    }
+
+    @Override
+    public void cleanup(Mapper<WritableComparable<?>, Content, Text, 
ParseImpl>.Context context)
+        throws IOException, InterruptedException {
+      // Emit parse latency metrics
+      parseLatencyTracker.emitCounters(context);
     }
 
     @Override
@@ -156,7 +167,9 @@ public class ParseSegment extends NutchTool implements Tool 
{
         }
 
         long end = System.currentTimeMillis();
-        LOG.info("Parsed ({}ms): {}", (end - start), url);
+        long parseTime = end - start;
+        parseLatencyTracker.record(parseTime);
+        LOG.info("Parsed ({}ms): {}", parseTime, url);
 
         context.write(
             url,

Reply via email to