This is an automated email from the ASF dual-hosted git repository.
ritesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new c40cb07053 HDDS-7576. Prometheus metrics do not remove stale metrics
until restart (#4057)
c40cb07053 is described below
commit c40cb0705312c705f474df2539e4db0d8088f817
Author: Christos Bisias <[email protected]>
AuthorDate: Thu Dec 22 00:51:04 2022 +0200
HDDS-7576. Prometheus metrics do not remove stale metrics until restart
(#4057)
---
.../hdds/server/http/PrometheusMetricsSink.java | 33 +++--
....java => TestPrometheusMetricsIntegration.java} | 146 +++++++++++++--------
.../server/http/TestPrometheusMetricsSink.java | 146 +--------------------
3 files changed, 117 insertions(+), 208 deletions(-)
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/http/PrometheusMetricsSink.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/http/PrometheusMetricsSink.java
index e77553ea3a..320f92efc7 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/http/PrometheusMetricsSink.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/http/PrometheusMetricsSink.java
@@ -45,7 +45,9 @@ public class PrometheusMetricsSink implements MetricsSink {
/**
* Cached output lines for each metrics.
*/
- private final Map<String, Map<String, String>> metricLines =
+ private Map<String, Map<String, String>> metricLines =
+ Collections.synchronizedSortedMap(new TreeMap<>());
+ private Map<String, Map<String, String>> nextMetricLines =
Collections.synchronizedSortedMap(new TreeMap<>());
private static final Pattern SPLIT_PATTERN =
@@ -59,15 +61,15 @@ public class PrometheusMetricsSink implements MetricsSink {
@Override
public void putMetrics(MetricsRecord metricsRecord) {
- for (AbstractMetric metrics : metricsRecord.metrics()) {
- if (metrics.type() == MetricType.COUNTER
- || metrics.type() == MetricType.GAUGE) {
+ for (AbstractMetric metric : metricsRecord.metrics()) {
+ if (metric.type() == MetricType.COUNTER
+ || metric.type() == MetricType.GAUGE) {
String metricName = DecayRpcSchedulerUtil
- .splitMetricNameIfNeeded(metricsRecord.name(), metrics.name());
+ .splitMetricNameIfNeeded(metricsRecord.name(), metric.name());
// If there is no username this should be null
String username = DecayRpcSchedulerUtil
- .checkMetricNameForUsername(metricsRecord.name(), metrics.name());
+ .checkMetricNameForUsername(metricsRecord.name(), metric.name());
String key = prometheusName(
metricsRecord.name(), metricName);
@@ -78,11 +80,13 @@ public class PrometheusMetricsSink implements MetricsSink {
String metricKey = "# TYPE "
+ key
+ " "
- + metrics.type().toString().toLowerCase();
+ + metric.type().toString().toLowerCase();
- metricLines.computeIfAbsent(metricKey,
- any -> Collections.synchronizedSortedMap(new TreeMap<>()))
- .put(prometheusMetricKeyAsString, String.valueOf(metrics.value()));
+ synchronized (this) {
+ nextMetricLines.computeIfAbsent(metricKey,
+ any -> Collections.synchronizedSortedMap(new TreeMap<>()))
+ .put(prometheusMetricKeyAsString,
String.valueOf(metric.value()));
+ }
}
}
}
@@ -146,7 +150,11 @@ public class PrometheusMetricsSink implements MetricsSink {
@Override
public void flush() {
-
+ synchronized (this) {
+ metricLines = nextMetricLines;
+ nextMetricLines = Collections
+ .synchronizedSortedMap(new TreeMap<>());
+ }
}
@Override
@@ -154,7 +162,8 @@ public class PrometheusMetricsSink implements MetricsSink {
}
- public void writeMetrics(Writer writer) throws IOException {
+ public synchronized void writeMetrics(Writer writer)
+ throws IOException {
for (Map.Entry<String, Map<String, String>> metricsEntry
: metricLines.entrySet()) {
writer.write(metricsEntry.getKey() + "\n");
diff --git
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/http/TestPrometheusMetricsSink.java
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/http/TestPrometheusMetricsIntegration.java
similarity index 56%
copy from
hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/http/TestPrometheusMetricsSink.java
copy to
hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/http/TestPrometheusMetricsIntegration.java
index 5339a06bbb..bc2ccef57d 100644
---
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/http/TestPrometheusMetricsSink.java
+++
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/http/TestPrometheusMetricsIntegration.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -22,6 +22,8 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
+import java.util.concurrent.TimeoutException;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.metrics2.MetricsInfo;
import org.apache.hadoop.metrics2.MetricsSource;
@@ -31,15 +33,16 @@ import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.ozone.test.GenericTestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
/**
- * Test prometheus Sink.
+ * Test prometheus Metrics.
*/
-public class TestPrometheusMetricsSink {
+public class TestPrometheusMetricsIntegration {
private MetricsSystem metrics;
private PrometheusMetricsSink sink;
@@ -87,15 +90,15 @@ public class TestPrometheusMetricsSink {
}
@Test
- public void testPublish() throws IOException {
+ public void testPublish()
+ throws InterruptedException, TimeoutException {
//GIVEN
TestMetrics testMetrics = metrics
.register("TestMetrics", "Testing metrics", new TestMetrics());
testMetrics.numBucketCreateFails.incr();
- //WHEN
- String writtenMetrics = publishMetricsAndGetOutput();
+ String writtenMetrics = waitForMetricsToPublish("test_metrics_num");
//THEN
Assertions.assertTrue(
@@ -103,11 +106,14 @@ public class TestPrometheusMetricsSink {
"test_metrics_num_bucket_create_fails{context=\"dfs\""),
"The expected metric line is missing from prometheus metrics output"
);
+
+ metrics.unregisterSource("TestMetrics");
}
@Test
- public void testPublishWithSameName() throws IOException {
- //GIVEN
+ public void testPublishWithSameName()
+ throws InterruptedException, TimeoutException {
+ // GIVEN
metrics.register("FooBar", "fooBar", (MetricsSource) (collector, all) -> {
collector.addRecord("RpcMetrics").add(new MetricsTag(PORT_INFO, "1234"))
.addGauge(COUNTER_INFO, COUNTER_1).endRecord();
@@ -116,8 +122,7 @@ public class TestPrometheusMetricsSink {
PORT_INFO, "2345")).addGauge(COUNTER_INFO, COUNTER_2).endRecord();
});
- // WHEN
- String writtenMetrics = publishMetricsAndGetOutput();
+ String writtenMetrics = waitForMetricsToPublish("rpc_metrics_counter");
// THEN
Assertions.assertTrue(
@@ -127,11 +132,14 @@ public class TestPrometheusMetricsSink {
Assertions.assertTrue(
writtenMetrics.contains("rpc_metrics_counter{port=\"1234\""),
"The expected metric line is missing from prometheus metrics output");
+
+ metrics.unregisterSource("FooBar");
}
@Test
- public void testTypeWithSameNameButDifferentLabels() throws IOException {
- //GIVEN
+ public void testTypeWithSameNameButDifferentLabels()
+ throws InterruptedException, TimeoutException {
+ // GIVEN
metrics.register("SameName", "sameName",
(MetricsSource) (collector, all) -> {
collector.addRecord("SameName").add(new MetricsTag(PORT_INFO,
"1234"))
@@ -141,59 +149,63 @@ public class TestPrometheusMetricsSink {
});
// WHEN
- String writtenMetrics = publishMetricsAndGetOutput();
+ String writtenMetrics = waitForMetricsToPublish("same_name_counter");
// THEN
Assertions.assertEquals(1, StringUtils.countMatches(writtenMetrics,
"# TYPE same_name_counter"));
- }
- @Test
- public void testNamingCamelCase() {
- //THEN
- Assertions.assertEquals("rpc_time_some_metrics",
- sink.prometheusName("RpcTime", "SomeMetrics"));
-
- Assertions.assertEquals("om_rpc_time_om_info_keys",
- sink.prometheusName("OMRpcTime", "OMInfoKeys"));
-
- Assertions.assertEquals("rpc_time_small",
- sink.prometheusName("RpcTime", "small"));
- }
+ // both metrics should be present
+ Assertions.assertTrue(
+ writtenMetrics.contains("same_name_counter{port=\"1234\""),
+ "The expected metric line is present in prometheus metrics output");
+ Assertions.assertTrue(
+ writtenMetrics.contains("same_name_counter{port=\"2345\""),
+ "The expected metric line is present in prometheus metrics output");
- @Test
- public void testNamingRocksDB() {
- //RocksDB metrics are handled differently.
- // THEN
- Assertions.assertEquals("rocksdb_om_db_num_open_connections",
- sink.prometheusName("Rocksdb_om.db", "num_open_connections"));
+ metrics.unregisterSource("SameName");
}
+ /**
+ * Make sure Prometheus metrics start fresh after each flush.
+ * Publish the metrics and flush them,
+ * then unregister one of them and register another.
+ * Publish and flush the metrics again
+ * and then check that the unregistered metric is not present.
+ */
@Test
- public void testNamingPipeline() {
+ public void testRemovingStaleMetricsOnFlush()
+ throws InterruptedException, TimeoutException {
// GIVEN
- String recordName = "SCMPipelineMetrics";
- String metricName = "NumBlocksAllocated-"
- + "RATIS-THREE-47659e3d-40c9-43b3-9792-4982fc279aba";
+ metrics.register("StaleMetric", "staleMetric",
+ (MetricsSource) (collector, all) ->
+ collector.addRecord("StaleMetric")
+ .add(new MetricsTag(PORT_INFO, "1234"))
+ .addGauge(COUNTER_INFO, COUNTER_1).endRecord());
- // THEN
- Assertions.assertEquals(
- "scm_pipeline_metrics_"
- + "num_blocks_allocated_"
- + "ratis_three_47659e3d_40c9_43b3_9792_4982fc279aba",
- sink.prometheusName(recordName, metricName));
- }
+ waitForMetricsToPublish("stale_metric_counter");
- @Test
- public void testNamingSpaces() {
- //GIVEN
- String recordName = "JvmMetrics";
- String metricName = "GcTimeMillisG1 Young Generation";
+ // unregister the metric
+ metrics.unregisterSource("StaleMetric");
+
+ metrics.register("SomeMetric", "someMetric",
+ (MetricsSource) (collector, all) ->
+ collector.addRecord("SomeMetric")
+ .add(new MetricsTag(PORT_INFO, "4321"))
+ .addGauge(COUNTER_INFO, COUNTER_2).endRecord());
+
+ String writtenMetrics = waitForMetricsToPublish("some_metric_counter");
// THEN
- Assertions.assertEquals(
- "jvm_metrics_gc_time_millis_g1_young_generation",
- sink.prometheusName(recordName, metricName));
+ // The first metric shouldn't be present
+ Assertions.assertFalse(
+ writtenMetrics.contains("stale_metric_counter{port=\"1234\""),
+ "The expected metric line is present in prometheus metrics output");
+ Assertions.assertTrue(
+ writtenMetrics.contains("some_metric_counter{port=\"4321\""),
+ "The expected metric line is present in prometheus metrics output");
+
+ metrics.unregisterSource("SomeMetric");
}
private String publishMetricsAndGetOutput() throws IOException {
@@ -204,10 +216,36 @@ public class TestPrometheusMetricsSink {
sink.writeMetrics(writer);
writer.flush();
-
return stream.toString(UTF_8.name());
}
+ /**
+ * metrics.publishMetricsNow() might not finish in a reasonable
+ * amount of time leading to a full queue and any further attempt
+ * for publishing to fail. Wrapping the call with
+ * GenericTestUtils.waitFor() to retry until the queue has been
+ * cleared and publish is a success.
+ *
+ * @param registeredMetric to check if it's published
+ * @return all published metrics
+ */
+ private String waitForMetricsToPublish(String registeredMetric)
+ throws InterruptedException, TimeoutException {
+
+ final String[] writtenMetrics = new String[1];
+
+ GenericTestUtils.waitFor(() -> {
+ try {
+ writtenMetrics[0] = publishMetricsAndGetOutput();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return writtenMetrics[0].contains(registeredMetric);
+ }, 1000, 120000);
+
+ return writtenMetrics[0];
+ }
+
/**
* Example metric pojo.
*/
diff --git
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/http/TestPrometheusMetricsSink.java
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/http/TestPrometheusMetricsSink.java
index 5339a06bbb..dc15d805d6 100644
---
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/http/TestPrometheusMetricsSink.java
+++
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/http/TestPrometheusMetricsSink.java
@@ -17,23 +17,8 @@
*/
package org.apache.hadoop.hdds.server.http;
-import static java.nio.charset.StandardCharsets.UTF_8;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.metrics2.MetricsInfo;
-import org.apache.hadoop.metrics2.MetricsSource;
-import org.apache.hadoop.metrics2.MetricsSystem;
-import org.apache.hadoop.metrics2.MetricsTag;
-import org.apache.hadoop.metrics2.annotation.Metric;
-import org.apache.hadoop.metrics2.annotation.Metrics;
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
-import org.apache.hadoop.metrics2.lib.MutableCounterLong;
-import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
/**
@@ -41,111 +26,11 @@ import org.junit.jupiter.api.Test;
*/
public class TestPrometheusMetricsSink {
- private MetricsSystem metrics;
- private PrometheusMetricsSink sink;
-
- private static final MetricsInfo PORT_INFO = new MetricsInfo() {
- @Override
- public String name() {
- return "PORT";
- }
-
- @Override
- public String description() {
- return "port";
- }
- };
-
- private static final MetricsInfo COUNTER_INFO = new MetricsInfo() {
- @Override
- public String name() {
- return "COUNTER";
- }
-
- @Override
- public String description() {
- return "counter";
- }
- };
-
- private static final int COUNTER_1 = 123;
- private static final int COUNTER_2 = 234;
+ private static PrometheusMetricsSink sink;
- @BeforeEach
- public void init() {
- metrics = DefaultMetricsSystem.instance();
-
- metrics.init("test");
+ @BeforeAll
+ public static void setUp() {
sink = new PrometheusMetricsSink();
- metrics.register("Prometheus", "Prometheus", sink);
- }
-
- @AfterEach
- public void tearDown() {
- metrics.stop();
- metrics.shutdown();
- }
-
- @Test
- public void testPublish() throws IOException {
- //GIVEN
- TestMetrics testMetrics = metrics
- .register("TestMetrics", "Testing metrics", new TestMetrics());
-
- testMetrics.numBucketCreateFails.incr();
-
- //WHEN
- String writtenMetrics = publishMetricsAndGetOutput();
-
- //THEN
- Assertions.assertTrue(
- writtenMetrics.contains(
- "test_metrics_num_bucket_create_fails{context=\"dfs\""),
- "The expected metric line is missing from prometheus metrics output"
- );
- }
-
- @Test
- public void testPublishWithSameName() throws IOException {
- //GIVEN
- metrics.register("FooBar", "fooBar", (MetricsSource) (collector, all) -> {
- collector.addRecord("RpcMetrics").add(new MetricsTag(PORT_INFO, "1234"))
- .addGauge(COUNTER_INFO, COUNTER_1).endRecord();
-
- collector.addRecord("RpcMetrics").add(new MetricsTag(
- PORT_INFO, "2345")).addGauge(COUNTER_INFO, COUNTER_2).endRecord();
- });
-
- // WHEN
- String writtenMetrics = publishMetricsAndGetOutput();
-
- // THEN
- Assertions.assertTrue(
- writtenMetrics.contains("rpc_metrics_counter{port=\"2345\""),
- "The expected metric line is missing from prometheus metrics output");
-
- Assertions.assertTrue(
- writtenMetrics.contains("rpc_metrics_counter{port=\"1234\""),
- "The expected metric line is missing from prometheus metrics output");
- }
-
- @Test
- public void testTypeWithSameNameButDifferentLabels() throws IOException {
- //GIVEN
- metrics.register("SameName", "sameName",
- (MetricsSource) (collector, all) -> {
- collector.addRecord("SameName").add(new MetricsTag(PORT_INFO,
"1234"))
- .addGauge(COUNTER_INFO, COUNTER_1).endRecord();
- collector.addRecord("SameName").add(new MetricsTag(PORT_INFO,
"2345"))
- .addGauge(COUNTER_INFO, COUNTER_2).endRecord();
- });
-
- // WHEN
- String writtenMetrics = publishMetricsAndGetOutput();
-
- // THEN
- Assertions.assertEquals(1, StringUtils.countMatches(writtenMetrics,
- "# TYPE same_name_counter"));
}
@Test
@@ -195,27 +80,4 @@ public class TestPrometheusMetricsSink {
"jvm_metrics_gc_time_millis_g1_young_generation",
sink.prometheusName(recordName, metricName));
}
-
- private String publishMetricsAndGetOutput() throws IOException {
- metrics.publishMetricsNow();
-
- ByteArrayOutputStream stream = new ByteArrayOutputStream();
- OutputStreamWriter writer = new OutputStreamWriter(stream, UTF_8);
-
- sink.writeMetrics(writer);
- writer.flush();
-
- return stream.toString(UTF_8.name());
- }
-
- /**
- * Example metric pojo.
- */
- @Metrics(about = "Test Metrics", context = "dfs")
- private static class TestMetrics {
-
- @Metric
- private MutableCounterLong numBucketCreateFails;
- }
-
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]