This is an automated email from the ASF dual-hosted git repository.

ritesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new c40cb07053 HDDS-7576. Prometheus metrics do not remove stale metrics 
until restart (#4057)
c40cb07053 is described below

commit c40cb0705312c705f474df2539e4db0d8088f817
Author: Christos Bisias <[email protected]>
AuthorDate: Thu Dec 22 00:51:04 2022 +0200

    HDDS-7576. Prometheus metrics do not remove stale metrics until restart 
(#4057)
---
 .../hdds/server/http/PrometheusMetricsSink.java    |  33 +++--
 ....java => TestPrometheusMetricsIntegration.java} | 146 +++++++++++++--------
 .../server/http/TestPrometheusMetricsSink.java     | 146 +--------------------
 3 files changed, 117 insertions(+), 208 deletions(-)

diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/http/PrometheusMetricsSink.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/http/PrometheusMetricsSink.java
index e77553ea3a..320f92efc7 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/http/PrometheusMetricsSink.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/http/PrometheusMetricsSink.java
@@ -45,7 +45,9 @@ public class PrometheusMetricsSink implements MetricsSink {
   /**
    * Cached output lines for each metrics.
    */
-  private final Map<String, Map<String, String>> metricLines =
+  private Map<String, Map<String, String>> metricLines =
+      Collections.synchronizedSortedMap(new TreeMap<>());
+  private Map<String, Map<String, String>> nextMetricLines =
       Collections.synchronizedSortedMap(new TreeMap<>());
 
   private static final Pattern SPLIT_PATTERN =
@@ -59,15 +61,15 @@ public class PrometheusMetricsSink implements MetricsSink {
 
   @Override
   public void putMetrics(MetricsRecord metricsRecord) {
-    for (AbstractMetric metrics : metricsRecord.metrics()) {
-      if (metrics.type() == MetricType.COUNTER
-          || metrics.type() == MetricType.GAUGE) {
+    for (AbstractMetric metric : metricsRecord.metrics()) {
+      if (metric.type() == MetricType.COUNTER
+          || metric.type() == MetricType.GAUGE) {
 
         String metricName = DecayRpcSchedulerUtil
-            .splitMetricNameIfNeeded(metricsRecord.name(), metrics.name());
+            .splitMetricNameIfNeeded(metricsRecord.name(), metric.name());
         // If there is no username this should be null
         String username = DecayRpcSchedulerUtil
-            .checkMetricNameForUsername(metricsRecord.name(), metrics.name());
+            .checkMetricNameForUsername(metricsRecord.name(), metric.name());
 
         String key = prometheusName(
             metricsRecord.name(), metricName);
@@ -78,11 +80,13 @@ public class PrometheusMetricsSink implements MetricsSink {
         String metricKey = "# TYPE "
             + key
             + " "
-            + metrics.type().toString().toLowerCase();
+            + metric.type().toString().toLowerCase();
 
-        metricLines.computeIfAbsent(metricKey,
-            any -> Collections.synchronizedSortedMap(new TreeMap<>()))
-            .put(prometheusMetricKeyAsString, String.valueOf(metrics.value()));
+        synchronized (this) {
+          nextMetricLines.computeIfAbsent(metricKey,
+                  any -> Collections.synchronizedSortedMap(new TreeMap<>()))
+              .put(prometheusMetricKeyAsString, 
String.valueOf(metric.value()));
+        }
       }
     }
   }
@@ -146,7 +150,11 @@ public class PrometheusMetricsSink implements MetricsSink {
 
   @Override
   public void flush() {
-
+    synchronized (this) {
+      metricLines = nextMetricLines;
+      nextMetricLines = Collections
+          .synchronizedSortedMap(new TreeMap<>());
+    }
   }
 
   @Override
@@ -154,7 +162,8 @@ public class PrometheusMetricsSink implements MetricsSink {
 
   }
 
-  public void writeMetrics(Writer writer) throws IOException {
+  public synchronized void writeMetrics(Writer writer)
+      throws IOException {
     for (Map.Entry<String, Map<String, String>> metricsEntry
         : metricLines.entrySet()) {
       writer.write(metricsEntry.getKey() + "\n");
diff --git 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/http/TestPrometheusMetricsSink.java
 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/http/TestPrometheusMetricsIntegration.java
similarity index 56%
copy from 
hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/http/TestPrometheusMetricsSink.java
copy to 
hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/http/TestPrometheusMetricsIntegration.java
index 5339a06bbb..bc2ccef57d 100644
--- 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/http/TestPrometheusMetricsSink.java
+++ 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/http/TestPrometheusMetricsIntegration.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -22,6 +22,8 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
+import java.util.concurrent.TimeoutException;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.metrics2.MetricsInfo;
 import org.apache.hadoop.metrics2.MetricsSource;
@@ -31,15 +33,16 @@ import org.apache.hadoop.metrics2.annotation.Metric;
 import org.apache.hadoop.metrics2.annotation.Metrics;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.ozone.test.GenericTestUtils;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 /**
- * Test prometheus Sink.
+ * Test prometheus Metrics.
  */
-public class TestPrometheusMetricsSink {
+public class TestPrometheusMetricsIntegration {
 
   private MetricsSystem metrics;
   private PrometheusMetricsSink sink;
@@ -87,15 +90,15 @@ public class TestPrometheusMetricsSink {
   }
 
   @Test
-  public void testPublish() throws IOException {
+  public void testPublish()
+      throws InterruptedException, TimeoutException {
     //GIVEN
     TestMetrics testMetrics = metrics
         .register("TestMetrics", "Testing metrics", new TestMetrics());
 
     testMetrics.numBucketCreateFails.incr();
 
-    //WHEN
-    String writtenMetrics = publishMetricsAndGetOutput();
+    String writtenMetrics = waitForMetricsToPublish("test_metrics_num");
 
     //THEN
     Assertions.assertTrue(
@@ -103,11 +106,14 @@ public class TestPrometheusMetricsSink {
             "test_metrics_num_bucket_create_fails{context=\"dfs\""),
         "The expected metric line is missing from prometheus metrics output"
     );
+
+    metrics.unregisterSource("TestMetrics");
   }
 
   @Test
-  public void testPublishWithSameName() throws IOException {
-    //GIVEN
+  public void testPublishWithSameName()
+      throws InterruptedException, TimeoutException {
+    // GIVEN
     metrics.register("FooBar", "fooBar", (MetricsSource) (collector, all) -> {
       collector.addRecord("RpcMetrics").add(new MetricsTag(PORT_INFO, "1234"))
           .addGauge(COUNTER_INFO, COUNTER_1).endRecord();
@@ -116,8 +122,7 @@ public class TestPrometheusMetricsSink {
           PORT_INFO, "2345")).addGauge(COUNTER_INFO, COUNTER_2).endRecord();
     });
 
-    // WHEN
-    String writtenMetrics = publishMetricsAndGetOutput();
+    String writtenMetrics = waitForMetricsToPublish("rpc_metrics_counter");
 
     // THEN
     Assertions.assertTrue(
@@ -127,11 +132,14 @@ public class TestPrometheusMetricsSink {
     Assertions.assertTrue(
         writtenMetrics.contains("rpc_metrics_counter{port=\"1234\""),
         "The expected metric line is missing from prometheus metrics output");
+
+    metrics.unregisterSource("FooBar");
   }
 
   @Test
-  public void testTypeWithSameNameButDifferentLabels() throws IOException {
-    //GIVEN
+  public void testTypeWithSameNameButDifferentLabels()
+      throws InterruptedException, TimeoutException {
+    // GIVEN
     metrics.register("SameName", "sameName",
         (MetricsSource) (collector, all) -> {
           collector.addRecord("SameName").add(new MetricsTag(PORT_INFO, 
"1234"))
@@ -141,59 +149,63 @@ public class TestPrometheusMetricsSink {
         });
 
     // WHEN
-    String writtenMetrics = publishMetricsAndGetOutput();
+    String writtenMetrics = waitForMetricsToPublish("same_name_counter");
 
     // THEN
     Assertions.assertEquals(1, StringUtils.countMatches(writtenMetrics,
         "# TYPE same_name_counter"));
-  }
 
-  @Test
-  public void testNamingCamelCase() {
-    //THEN
-    Assertions.assertEquals("rpc_time_some_metrics",
-        sink.prometheusName("RpcTime", "SomeMetrics"));
-
-    Assertions.assertEquals("om_rpc_time_om_info_keys",
-        sink.prometheusName("OMRpcTime", "OMInfoKeys"));
-
-    Assertions.assertEquals("rpc_time_small",
-        sink.prometheusName("RpcTime", "small"));
-  }
+    // both metrics should be present
+    Assertions.assertTrue(
+        writtenMetrics.contains("same_name_counter{port=\"1234\""),
+        "The expected metric line is present in prometheus metrics output");
+    Assertions.assertTrue(
+        writtenMetrics.contains("same_name_counter{port=\"2345\""),
+        "The expected metric line is present in prometheus metrics output");
 
-  @Test
-  public void testNamingRocksDB() {
-    //RocksDB metrics are handled differently.
-    // THEN
-    Assertions.assertEquals("rocksdb_om_db_num_open_connections",
-        sink.prometheusName("Rocksdb_om.db", "num_open_connections"));
+    metrics.unregisterSource("SameName");
   }
 
+  /**
+   * Make sure Prometheus metrics start fresh after each flush.
+   * Publish the metrics and flush them,
+   * then unregister one of them and register another.
+   * Publish and flush the metrics again
+   * and then check that the unregistered metric is not present.
+   */
   @Test
-  public void testNamingPipeline() {
+  public void testRemovingStaleMetricsOnFlush()
+      throws InterruptedException, TimeoutException {
     // GIVEN
-    String recordName = "SCMPipelineMetrics";
-    String metricName = "NumBlocksAllocated-"
-        + "RATIS-THREE-47659e3d-40c9-43b3-9792-4982fc279aba";
+    metrics.register("StaleMetric", "staleMetric",
+        (MetricsSource) (collector, all) ->
+            collector.addRecord("StaleMetric")
+                .add(new MetricsTag(PORT_INFO, "1234"))
+                .addGauge(COUNTER_INFO, COUNTER_1).endRecord());
 
-    // THEN
-    Assertions.assertEquals(
-        "scm_pipeline_metrics_"
-            + "num_blocks_allocated_"
-            + "ratis_three_47659e3d_40c9_43b3_9792_4982fc279aba",
-        sink.prometheusName(recordName, metricName));
-  }
+    waitForMetricsToPublish("stale_metric_counter");
 
-  @Test
-  public void testNamingSpaces() {
-    //GIVEN
-    String recordName = "JvmMetrics";
-    String metricName = "GcTimeMillisG1 Young Generation";
+    // unregister the metric
+    metrics.unregisterSource("StaleMetric");
+
+    metrics.register("SomeMetric", "someMetric",
+        (MetricsSource) (collector, all) ->
+            collector.addRecord("SomeMetric")
+                .add(new MetricsTag(PORT_INFO, "4321"))
+                .addGauge(COUNTER_INFO, COUNTER_2).endRecord());
+
+    String writtenMetrics = waitForMetricsToPublish("some_metric_counter");
 
     // THEN
-    Assertions.assertEquals(
-        "jvm_metrics_gc_time_millis_g1_young_generation",
-        sink.prometheusName(recordName, metricName));
+    // The first metric shouldn't be present
+    Assertions.assertFalse(
+        writtenMetrics.contains("stale_metric_counter{port=\"1234\""),
+        "The expected metric line is present in prometheus metrics output");
+    Assertions.assertTrue(
+        writtenMetrics.contains("some_metric_counter{port=\"4321\""),
+        "The expected metric line is present in prometheus metrics output");
+
+    metrics.unregisterSource("SomeMetric");
   }
 
   private String publishMetricsAndGetOutput() throws IOException {
@@ -204,10 +216,36 @@ public class TestPrometheusMetricsSink {
 
     sink.writeMetrics(writer);
     writer.flush();
-
     return stream.toString(UTF_8.name());
   }
 
+  /**
+   * metrics.publishMetricsNow() might not finish in a reasonable
+   * amount of time leading to a full queue and any further attempt
+   * for publishing to fail. Wrapping the call with
+   * GenericTestUtils.waitFor() to retry until the queue has been
+   * cleared and publish is a success.
+   *
+   * @param registeredMetric to check if it's published
+   * @return all published metrics
+   */
+  private String waitForMetricsToPublish(String registeredMetric)
+      throws InterruptedException, TimeoutException {
+
+    final String[] writtenMetrics = new String[1];
+
+    GenericTestUtils.waitFor(() -> {
+      try {
+        writtenMetrics[0] = publishMetricsAndGetOutput();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      return writtenMetrics[0].contains(registeredMetric);
+    }, 1000, 120000);
+
+    return writtenMetrics[0];
+  }
+
   /**
    * Example metric pojo.
    */
diff --git 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/http/TestPrometheusMetricsSink.java
 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/http/TestPrometheusMetricsSink.java
index 5339a06bbb..dc15d805d6 100644
--- 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/http/TestPrometheusMetricsSink.java
+++ 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/http/TestPrometheusMetricsSink.java
@@ -17,23 +17,8 @@
  */
 package org.apache.hadoop.hdds.server.http;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.metrics2.MetricsInfo;
-import org.apache.hadoop.metrics2.MetricsSource;
-import org.apache.hadoop.metrics2.MetricsSystem;
-import org.apache.hadoop.metrics2.MetricsTag;
-import org.apache.hadoop.metrics2.annotation.Metric;
-import org.apache.hadoop.metrics2.annotation.Metrics;
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
-import org.apache.hadoop.metrics2.lib.MutableCounterLong;
-import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 
 /**
@@ -41,111 +26,11 @@ import org.junit.jupiter.api.Test;
  */
 public class TestPrometheusMetricsSink {
 
-  private MetricsSystem metrics;
-  private PrometheusMetricsSink sink;
-
-  private static final MetricsInfo PORT_INFO = new MetricsInfo() {
-    @Override
-    public String name() {
-      return "PORT";
-    }
-
-    @Override
-    public String description() {
-      return "port";
-    }
-  };
-
-  private static final MetricsInfo COUNTER_INFO = new MetricsInfo() {
-    @Override
-    public String name() {
-      return "COUNTER";
-    }
-
-    @Override
-    public String description() {
-      return "counter";
-    }
-  };
-
-  private static final int COUNTER_1 = 123;
-  private static final int COUNTER_2 = 234;
+  private static PrometheusMetricsSink sink;
 
-  @BeforeEach
-  public void init() {
-    metrics = DefaultMetricsSystem.instance();
-
-    metrics.init("test");
+  @BeforeAll
+  public static void setUp() {
     sink = new PrometheusMetricsSink();
-    metrics.register("Prometheus", "Prometheus", sink);
-  }
-
-  @AfterEach
-  public void tearDown() {
-    metrics.stop();
-    metrics.shutdown();
-  }
-
-  @Test
-  public void testPublish() throws IOException {
-    //GIVEN
-    TestMetrics testMetrics = metrics
-        .register("TestMetrics", "Testing metrics", new TestMetrics());
-
-    testMetrics.numBucketCreateFails.incr();
-
-    //WHEN
-    String writtenMetrics = publishMetricsAndGetOutput();
-
-    //THEN
-    Assertions.assertTrue(
-        writtenMetrics.contains(
-            "test_metrics_num_bucket_create_fails{context=\"dfs\""),
-        "The expected metric line is missing from prometheus metrics output"
-    );
-  }
-
-  @Test
-  public void testPublishWithSameName() throws IOException {
-    //GIVEN
-    metrics.register("FooBar", "fooBar", (MetricsSource) (collector, all) -> {
-      collector.addRecord("RpcMetrics").add(new MetricsTag(PORT_INFO, "1234"))
-          .addGauge(COUNTER_INFO, COUNTER_1).endRecord();
-
-      collector.addRecord("RpcMetrics").add(new MetricsTag(
-          PORT_INFO, "2345")).addGauge(COUNTER_INFO, COUNTER_2).endRecord();
-    });
-
-    // WHEN
-    String writtenMetrics = publishMetricsAndGetOutput();
-
-    // THEN
-    Assertions.assertTrue(
-        writtenMetrics.contains("rpc_metrics_counter{port=\"2345\""),
-        "The expected metric line is missing from prometheus metrics output");
-
-    Assertions.assertTrue(
-        writtenMetrics.contains("rpc_metrics_counter{port=\"1234\""),
-        "The expected metric line is missing from prometheus metrics output");
-  }
-
-  @Test
-  public void testTypeWithSameNameButDifferentLabels() throws IOException {
-    //GIVEN
-    metrics.register("SameName", "sameName",
-        (MetricsSource) (collector, all) -> {
-          collector.addRecord("SameName").add(new MetricsTag(PORT_INFO, 
"1234"))
-              .addGauge(COUNTER_INFO, COUNTER_1).endRecord();
-          collector.addRecord("SameName").add(new MetricsTag(PORT_INFO, 
"2345"))
-              .addGauge(COUNTER_INFO, COUNTER_2).endRecord();
-        });
-
-    // WHEN
-    String writtenMetrics = publishMetricsAndGetOutput();
-
-    // THEN
-    Assertions.assertEquals(1, StringUtils.countMatches(writtenMetrics,
-        "# TYPE same_name_counter"));
   }
 
   @Test
@@ -195,27 +80,4 @@ public class TestPrometheusMetricsSink {
         "jvm_metrics_gc_time_millis_g1_young_generation",
         sink.prometheusName(recordName, metricName));
   }
-
-  private String publishMetricsAndGetOutput() throws IOException {
-    metrics.publishMetricsNow();
-
-    ByteArrayOutputStream stream = new ByteArrayOutputStream();
-    OutputStreamWriter writer = new OutputStreamWriter(stream, UTF_8);
-
-    sink.writeMetrics(writer);
-    writer.flush();
-
-    return stream.toString(UTF_8.name());
-  }
-
-  /**
-   * Example metric pojo.
-   */
-  @Metrics(about = "Test Metrics", context = "dfs")
-  private static class TestMetrics {
-
-    @Metric
-    private MutableCounterLong numBucketCreateFails;
-  }
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to