This is an automated email from the ASF dual-hosted git repository.

inigoiri pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3e2ae1da00e HDFS-16949 Introduce inverse quantiles for metrics where 
higher numer… (#5495)
3e2ae1da00e is described below

commit 3e2ae1da00e055211914c90cca89d62432096530
Author: rdingankar <ravindra.dingan...@asu.edu>
AuthorDate: Mon Apr 10 08:56:00 2023 -0700

    HDFS-16949 Introduce inverse quantiles for metrics where higher numer… 
(#5495)
---
 .../hadoop/metrics2/lib/MetricsRegistry.java       | 25 +++++-
 .../metrics2/lib/MutableInverseQuantiles.java      | 89 ++++++++++++++++++++
 .../hadoop/metrics2/lib/MutableQuantiles.java      | 95 +++++++++++++++++-----
 .../hadoop/metrics2/util/TestSampleQuantiles.java  | 68 +++++++++++++---
 .../org/apache/hadoop/test/MetricsAsserts.java     | 25 +++++-
 .../server/datanode/metrics/DataNodeMetrics.java   |  2 +-
 .../hdfs/server/datanode/TestDataNodeMetrics.java  |  3 +-
 7 files changed, 271 insertions(+), 36 deletions(-)

diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MetricsRegistry.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MetricsRegistry.java
index b71f7f8cc5e..31031b808ea 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MetricsRegistry.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MetricsRegistry.java
@@ -227,6 +227,29 @@ public class MetricsRegistry {
     return ret;
   }
 
+  /**
+   * Create a mutable inverse metric that estimates inverse quantiles of a 
stream of values
+   * @param name of the metric
+   * @param desc metric description
+   * @param sampleName of the metric (e.g., "Ops")
+   * @param valueName of the metric (e.g., "Rate")
+   * @param interval rollover interval of estimator in seconds
+   * @return a new inverse quantile estimator object
+   * @throws MetricsException if interval is not a positive integer
+   */
+  public synchronized MutableQuantiles newInverseQuantiles(String name, String 
desc,
+      String sampleName, String valueName, int interval) {
+    checkMetricName(name);
+    if (interval <= 0) {
+      throw new MetricsException("Interval should be positive.  Value passed" +
+          " is: " + interval);
+    }
+    MutableQuantiles ret =
+        new MutableInverseQuantiles(name, desc, sampleName, valueName, 
interval);
+    metricsMap.put(name, ret);
+    return ret;
+  }
+
   /**
    * Create a mutable metric with stats
    * @param name  of the metric
@@ -278,7 +301,7 @@ public class MetricsRegistry {
   }
 
   /**
-   * Create a mutable rate metric (for throughput measurement)
+   * Create a mutable rate metric (for throughput measurement).
    * @param name  of the metric
    * @param desc  description
    * @param extended  produce extended stat (stdev/min/max etc.) if true
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableInverseQuantiles.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableInverseQuantiles.java
new file mode 100644
index 00000000000..a3d579cb9e7
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableInverseQuantiles.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.lib;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.metrics2.util.Quantile;
+import org.apache.hadoop.metrics2.util.SampleQuantiles;
+import java.text.DecimalFormat;
+import static org.apache.hadoop.metrics2.lib.Interns.info;
+
+/**
+ * Watches a stream of long values, maintaining online estimates of specific
+ * quantiles with provably low error bounds. Inverse quantiles are meant for
+ * highly accurate low-percentile (e.g. 1st, 5th) metrics.
+ * InverseQuantiles are used for metrics where higher the value better it is.
+ * ( eg: data transfer rate ).
+ * The 1st percentile here corresponds to the 99th inverse percentile metric,
+ * 5th percentile to 95th and so on.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class MutableInverseQuantiles extends MutableQuantiles{
+
+  static class InversePercentile extends Quantile {
+    InversePercentile(double inversePercentile) {
+      super(inversePercentile/100, inversePercentile/1000);
+    }
+  }
+
+  @VisibleForTesting
+  public static final Quantile[] INVERSE_QUANTILES = {new 
InversePercentile(50),
+      new InversePercentile(25), new InversePercentile(10),
+      new InversePercentile(5), new InversePercentile(1)};
+
+  /**
+   * Instantiates a new {@link MutableInverseQuantiles} for a metric that 
rolls itself
+   * over on the specified time interval.
+   *
+   * @param name          of the metric
+   * @param description   long-form textual description of the metric
+   * @param sampleName    type of items in the stream (e.g., "Ops")
+   * @param valueName     type of the values
+   * @param intervalSecs  rollover interval (in seconds) of the estimator
+   */
+  public MutableInverseQuantiles(String name, String description, String 
sampleName,
+      String valueName, int intervalSecs) {
+    super(name, description, sampleName, valueName, intervalSecs);
+  }
+
+  /**
+   * Sets quantileInfo and estimator.
+   *
+   * @param ucName capitalized name of the metric
+   * @param uvName capitalized type of the values
+   * @param desc uncapitalized long-form textual description of the metric
+   * @param lvName uncapitalized type of the values
+   * @param df Number formatter for inverse percentile value
+   */
+  void setQuantiles(String ucName, String uvName, String desc, String lvName, 
DecimalFormat df) {
+    // Construct the MetricsInfos for inverse quantiles, converting to inverse 
percentiles
+    setQuantileInfos(INVERSE_QUANTILES.length);
+    for (int i = 0; i < INVERSE_QUANTILES.length; i++) {
+      double inversePercentile = 100 * (1 - INVERSE_QUANTILES[i].quantile);
+      String nameTemplate = ucName + df.format(inversePercentile) + 
"thInversePercentile" + uvName;
+      String descTemplate = df.format(inversePercentile) + " inverse 
percentile " + lvName
+          + " with " + getInterval() + " second interval for " + desc;
+      addQuantileInfo(i, info(nameTemplate, descTemplate));
+    }
+
+    setEstimator(new SampleQuantiles(INVERSE_QUANTILES));
+  }
+}
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableQuantiles.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableQuantiles.java
index f7dfaffb3f9..edb2159f17b 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableQuantiles.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableQuantiles.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.metrics2.lib;
 
 import static org.apache.hadoop.metrics2.lib.Interns.info;
 
+import java.text.DecimalFormat;
 import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -52,9 +53,10 @@ public class MutableQuantiles extends MutableMetric {
       new Quantile(0.75, 0.025), new Quantile(0.90, 0.010),
       new Quantile(0.95, 0.005), new Quantile(0.99, 0.001) };
 
-  private final MetricsInfo numInfo;
-  private final MetricsInfo[] quantileInfos;
-  private final int interval;
+  private MetricsInfo numInfo;
+  private MetricsInfo[] quantileInfos;
+  private int intervalSecs;
+  private static DecimalFormat decimalFormat = new DecimalFormat("###.####");
 
   private QuantileEstimator estimator;
   private long previousCount = 0;
@@ -91,26 +93,39 @@ public class MutableQuantiles extends MutableMetric {
     String lsName = StringUtils.uncapitalize(sampleName);
     String lvName = StringUtils.uncapitalize(valueName);
 
-    numInfo = info(ucName + "Num" + usName, String.format(
-        "Number of %s for %s with %ds interval", lsName, desc, interval));
+    setInterval(interval);
+    setNumInfo(info(ucName + "Num" + usName, String.format(
+        "Number of %s for %s with %ds interval", lsName, desc, interval)));
+    scheduledTask = scheduler.scheduleWithFixedDelay(new RolloverSample(this),
+        interval, interval, TimeUnit.SECONDS);
+    setQuantiles(ucName, uvName, desc, lvName, decimalFormat);
+  }
+
+  /**
+   * Sets quantileInfo and estimator.
+   *
+   * @param ucName capitalized name of the metric
+   * @param uvName capitalized type of the values
+   * @param desc uncapitalized long-form textual description of the metric
+   * @param lvName uncapitalized type of the values
+   * @param pDecimalFormat Number formatter for percentile value
+   */
+  void setQuantiles(String ucName, String uvName, String desc, String lvName, 
DecimalFormat pDecimalFormat) {
     // Construct the MetricsInfos for the quantiles, converting to percentiles
-    quantileInfos = new MetricsInfo[quantiles.length];
-    String nameTemplate = ucName + "%dthPercentile" + uvName;
-    String descTemplate = "%d percentile " + lvName + " with " + interval
-        + " second interval for " + desc;
+    setQuantileInfos(quantiles.length);
     for (int i = 0; i < quantiles.length; i++) {
-      int percentile = (int) (100 * quantiles[i].quantile);
-      quantileInfos[i] = info(String.format(nameTemplate, percentile),
-          String.format(descTemplate, percentile));
+      double percentile = 100 * quantiles[i].quantile;
+      String nameTemplate = ucName + pDecimalFormat.format(percentile) + 
"thPercentile" + uvName;
+      String descTemplate = pDecimalFormat.format(percentile) + " percentile " 
+ lvName
+          + " with " + getInterval() + " second interval for " + desc;
+      addQuantileInfo(i, info(nameTemplate, descTemplate));
     }
 
-    estimator = new SampleQuantiles(quantiles);
-
-    this.interval = interval;
-    scheduledTask = scheduler.scheduleWithFixedDelay(new RolloverSample(this),
-        interval, interval, TimeUnit.SECONDS);
+    setEstimator(new SampleQuantiles(quantiles));
   }
 
+  public MutableQuantiles() {}
+
   @Override
   public synchronized void snapshot(MetricsRecordBuilder builder, boolean all) 
{
     if (all || changed()) {
@@ -133,8 +148,50 @@ public class MutableQuantiles extends MutableMetric {
     estimator.insert(value);
   }
 
-  public int getInterval() {
-    return interval;
+  /**
+   * Set info about the metrics.
+   *
+   * @param pNumInfo info about the metrics.
+   */
+  public synchronized void setNumInfo(MetricsInfo pNumInfo) {
+    this.numInfo = pNumInfo;
+  }
+
+  /**
+   * Initialize quantileInfos array.
+   *
+   * @param length of the quantileInfos array.
+   */
+  public synchronized void setQuantileInfos(int length) {
+    this.quantileInfos = new MetricsInfo[length];
+  }
+
+  /**
+   * Add entry to quantileInfos array.
+   *
+   * @param i array index.
+   * @param info info to be added to  quantileInfos array.
+   */
+  public synchronized void addQuantileInfo(int i, MetricsInfo info) {
+    this.quantileInfos[i] = info;
+  }
+
+  /**
+   * Set the rollover interval (in seconds) of the estimator.
+   *
+   * @param pIntervalSecs of the estimator.
+   */
+  public synchronized void setInterval(int pIntervalSecs) {
+    this.intervalSecs = pIntervalSecs;
+  }
+
+  /**
+   * Get the rollover interval (in seconds) of the estimator.
+   *
+   * @return  intervalSecs of the estimator.
+   */
+  public synchronized int getInterval() {
+    return intervalSecs;
   }
 
   public void stop() {
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/util/TestSampleQuantiles.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/util/TestSampleQuantiles.java
index c7d8f60b181..aefd7a264b0 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/util/TestSampleQuantiles.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/util/TestSampleQuantiles.java
@@ -24,6 +24,7 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.Random;
 
+import org.apache.hadoop.metrics2.lib.MutableInverseQuantiles;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -36,6 +37,7 @@ public class TestSampleQuantiles {
       new Quantile(0.95, 0.005), new Quantile(0.99, 0.001) };
 
   SampleQuantiles estimator;
+  final static int NUM_REPEATS = 10;
 
   @Before
   public void init() {
@@ -91,28 +93,70 @@ public class TestSampleQuantiles {
   @Test
   public void testQuantileError() throws IOException {
     final int count = 100000;
-    Random r = new Random(0xDEADDEAD);
-    Long[] values = new Long[count];
+    Random rnd = new Random(0xDEADDEAD);
+    int[] values = new int[count];
     for (int i = 0; i < count; i++) {
-      values[i] = (long) (i + 1);
+      values[i] = i + 1;
     }
-    // Do 10 shuffle/insert/check cycles
-    for (int i = 0; i < 10; i++) {
-      System.out.println("Starting run " + i);
-      Collections.shuffle(Arrays.asList(values), r);
+
+    // Repeat shuffle/insert/check cycles 10 times
+    for (int i = 0; i < NUM_REPEATS; i++) {
+
+      // Shuffle
+      Collections.shuffle(Arrays.asList(values), rnd);
       estimator.clear();
-      for (int j = 0; j < count; j++) {
-        estimator.insert(values[j]);
+
+      // Insert
+      for (int value : values) {
+        estimator.insert(value);
       }
       Map<Quantile, Long> snapshot;
       snapshot = estimator.snapshot();
+
+      // Check
       for (Quantile q : quantiles) {
         long actual = (long) (q.quantile * count);
         long error = (long) (q.error * count);
         long estimate = snapshot.get(q);
-        System.out
-            .println(String.format("Expected %d with error %d, estimated %d",
-                actual, error, estimate));
+        assertThat(estimate <= actual + error).isTrue();
+        assertThat(estimate >= actual - error).isTrue();
+      }
+    }
+  }
+
+  /**
+   * Correctness test that checks that absolute error of the estimate for 
inverse quantiles
+   * is within specified error bounds for some randomly permuted streams of 
items.
+   */
+  @Test
+  public void testInverseQuantiles() throws IOException {
+    SampleQuantiles inverseQuantilesEstimator =
+        new SampleQuantiles(MutableInverseQuantiles.INVERSE_QUANTILES);
+    final int count = 100000;
+    Random rnd = new Random(0xDEADDEAD);
+    int[] values = new int[count];
+    for (int i = 0; i < count; i++) {
+      values[i] = i + 1;
+    }
+
+    // Repeat shuffle/insert/check cycles 10 times
+    for (int i = 0; i < NUM_REPEATS; i++) {
+      // Shuffle
+      Collections.shuffle(Arrays.asList(values), rnd);
+      inverseQuantilesEstimator.clear();
+
+      // Insert
+      for (int value : values) {
+        inverseQuantilesEstimator.insert(value);
+      }
+      Map<Quantile, Long> snapshot;
+      snapshot = inverseQuantilesEstimator.snapshot();
+
+      // Check
+      for (Quantile q : MutableInverseQuantiles.INVERSE_QUANTILES) {
+        long actual = (long) (q.quantile * count);
+        long error = (long) (q.error * count);
+        long estimate = snapshot.get(q);
         assertThat(estimate <= actual + error).isTrue();
         assertThat(estimate >= actual - error).isTrue();
       }
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java
index 9132e20210a..8210322f8f4 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java
@@ -392,13 +392,34 @@ public class MetricsAsserts {
    */
   public static void assertQuantileGauges(String prefix,
       MetricsRecordBuilder rb, String valueName) {
-    verify(rb).addGauge(eqName(info(prefix + "NumOps", "")), geq(0l));
+    verify(rb).addGauge(eqName(info(prefix + "NumOps", "")), geq(0L));
     for (Quantile q : MutableQuantiles.quantiles) {
       String nameTemplate = prefix + "%dthPercentile" + valueName;
       int percentile = (int) (100 * q.quantile);
       verify(rb).addGauge(
           eqName(info(String.format(nameTemplate, percentile), "")),
-          geq(0l));
+          geq(0L));
+    }
+  }
+
+  /**
+   * Asserts that the NumOps and inverse quantiles for a metric have been 
changed at
+   * some point to a non-zero value, for the specified value name of the
+   * metrics (e.g., "Rate").
+   *
+   * @param prefix of the metric
+   * @param rb MetricsRecordBuilder with the metric
+   * @param valueName the value name for the metric
+   */
+  public static void assertInverseQuantileGauges(String prefix,
+      MetricsRecordBuilder rb, String valueName) {
+    verify(rb).addGauge(eqName(info(prefix + "NumOps", "")), geq(0L));
+    for (Quantile q : MutableQuantiles.quantiles) {
+      String nameTemplate = prefix + "%dthInversePercentile" + valueName;
+      int percentile = (int) (100 * q.quantile);
+      verify(rb).addGauge(
+          eqName(info(String.format(nameTemplate, percentile), "")),
+          geq(0L));
     }
   }
 
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
index 675dbbff4c3..c3aa3c3a454 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
@@ -258,7 +258,7 @@ public class DataNodeMetrics {
           "ramDiskBlocksLazyPersistWindows" + interval + "s",
           "Time between the RamDisk block write and disk persist in ms",
           "ops", "latency", interval);
-      readTransferRateQuantiles[i] = registry.newQuantiles(
+      readTransferRateQuantiles[i] = registry.newInverseQuantiles(
           "readTransferRate" + interval + "s",
           "Rate at which bytes are read from datanode calculated in bytes per 
second",
           "ops", "rate", interval);
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java
index de5c985a4f0..35f7924be11 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.datanode;
 
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
 import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
+import static 
org.apache.hadoop.test.MetricsAsserts.assertInverseQuantileGauges;
 import static org.apache.hadoop.test.MetricsAsserts.assertQuantileGauges;
 import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
@@ -413,7 +414,7 @@ public class TestDataNodeMetrics {
           final long endWriteValue = getLongCounter("TotalWriteTime", rbNew);
           final long endReadValue = getLongCounter("TotalReadTime", rbNew);
           assertCounter("ReadTransferRateNumOps", 1L, rbNew);
-          assertQuantileGauges("ReadTransferRate" + "60s", rbNew, "Rate");
+          assertInverseQuantileGauges("ReadTransferRate60s", rbNew, "Rate");
           return endWriteValue > startWriteValue
               && endReadValue > startReadValue;
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to