This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new dd1b57944b1 [feat][misc] PIP-264: Copy OpenTelemetry resource
attributes to Prometheus labels (#23005)
dd1b57944b1 is described below
commit dd1b57944b117d16ebd371996b44c02af2ce325c
Author: Dragos Misca <[email protected]>
AuthorDate: Fri Jul 5 00:55:06 2024 -0700
[feat][misc] PIP-264: Copy OpenTelemetry resource attributes to Prometheus
labels (#23005)
---
.../pulsar/opentelemetry/OpenTelemetryService.java | 15 +++++++++
.../metrics/OpenTelemetrySanityTest.java | 39 ++++++++++++----------
2 files changed, 37 insertions(+), 17 deletions(-)
diff --git
a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryService.java
b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryService.java
index b32d353eb5a..e6c6d95273e 100644
---
a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryService.java
+++
b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryService.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.opentelemetry;
import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.annotations.VisibleForTesting;
import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.exporter.prometheus.PrometheusHttpServer;
import io.opentelemetry.instrumentation.runtimemetrics.java17.RuntimeMetrics;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk;
@@ -97,6 +98,20 @@ public class OpenTelemetryService implements Closeable {
return resource.merge(resourceBuilder.build());
});
+ sdkBuilder.addMetricReaderCustomizer((metricReader, configProperties)
-> {
+ if (metricReader instanceof PrometheusHttpServer
prometheusHttpServer) {
+ // At this point, the server is already started. We need to
close it and create a new one with the
+ // correct resource attributes filter.
+ prometheusHttpServer.close();
+
+ // Allow all resource attributes to be exposed.
+ return prometheusHttpServer.toBuilder()
+ .setAllowedResourceAttributesFilter(s -> true)
+ .build();
+ }
+ return metricReader;
+ });
+
if (builderCustomizer != null) {
builderCustomizer.accept(sdkBuilder);
}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/metrics/OpenTelemetrySanityTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/metrics/OpenTelemetrySanityTest.java
index 38afc1f127d..31e600f3aa8 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/metrics/OpenTelemetrySanityTest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/metrics/OpenTelemetrySanityTest.java
@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.tests.integration.metrics;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+import static org.awaitility.Awaitility.waitAtMost;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@@ -37,7 +39,6 @@ import
org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
import org.apache.pulsar.tests.integration.topologies.PulsarTestBase;
-import org.awaitility.Awaitility;
import org.testng.annotations.Test;
public class OpenTelemetrySanityTest {
@@ -71,17 +72,17 @@ public class OpenTelemetrySanityTest {
// TODO: Validate cluster name and service version are present once
// https://github.com/open-telemetry/opentelemetry-java/issues/6108 is
solved.
var metricName = "queueSize_ratio"; // Sent automatically by the
OpenTelemetry SDK.
- Awaitility.waitAtMost(90,
TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(()
-> {
+ waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1,
TimeUnit.SECONDS).until(() -> {
var metrics = getMetricsFromPrometheus(
openTelemetryCollectorContainer,
OpenTelemetryCollectorContainer.PROMETHEUS_EXPORTER_PORT);
return !metrics.findByNameAndLabels(metricName, "job",
PulsarBrokerOpenTelemetry.SERVICE_NAME).isEmpty();
});
- Awaitility.waitAtMost(90,
TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(()
-> {
+ waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1,
TimeUnit.SECONDS).until(() -> {
var metrics = getMetricsFromPrometheus(
openTelemetryCollectorContainer,
OpenTelemetryCollectorContainer.PROMETHEUS_EXPORTER_PORT);
return !metrics.findByNameAndLabels(metricName, "job",
PulsarProxyOpenTelemetry.SERVICE_NAME).isEmpty();
});
- Awaitility.waitAtMost(90,
TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(()
-> {
+ waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1,
TimeUnit.SECONDS).until(() -> {
var metrics = getMetricsFromPrometheus(
openTelemetryCollectorContainer,
OpenTelemetryCollectorContainer.PROMETHEUS_EXPORTER_PORT);
return !metrics.findByNameAndLabels(metricName, "job",
PulsarWorkerOpenTelemetry.SERVICE_NAME).isEmpty();
@@ -120,30 +121,34 @@ public class OpenTelemetrySanityTest {
pulsarCluster.start();
pulsarCluster.setupFunctionWorkers(PulsarTestBase.randomName(),
FunctionRuntimeType.PROCESS, 1);
- var metricName = "target_info"; // Sent automatically by the
OpenTelemetry SDK.
- Awaitility.waitAtMost(90,
TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(()
-> {
- var metrics = getMetricsFromPrometheus(pulsarCluster.getBroker(0),
prometheusExporterPort);
- return !metrics.findByNameAndLabels(metricName,
+ var targetInfoMetricName = "target_info"; // Sent automatically by the
OpenTelemetry SDK.
+ var cpuCountMetricName = "jvm_cpu_count"; // Configured by the
OpenTelemetryService.
+ waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1,
TimeUnit.SECONDS).untilAsserted(() -> {
+ var expectedMetrics = new String[] {targetInfoMetricName,
cpuCountMetricName, "pulsar_broker_topic_producer_count"};
+ var actualMetrics =
getMetricsFromPrometheus(pulsarCluster.getBroker(0), prometheusExporterPort);
+ assertThat(expectedMetrics).allMatch(expectedMetric ->
!actualMetrics.findByNameAndLabels(expectedMetric,
Pair.of("pulsar_cluster", clusterName),
Pair.of("service_name",
PulsarBrokerOpenTelemetry.SERVICE_NAME),
Pair.of("service_version", PulsarVersion.getVersion()),
- Pair.of("host_name",
pulsarCluster.getBroker(0).getHostname())).isEmpty();
+ Pair.of("host_name",
pulsarCluster.getBroker(0).getHostname())).isEmpty());
});
- Awaitility.waitAtMost(90,
TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(()
-> {
- var metrics = getMetricsFromPrometheus(pulsarCluster.getProxy(),
prometheusExporterPort);
- return !metrics.findByNameAndLabels(metricName,
+ waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1,
TimeUnit.SECONDS).untilAsserted(() -> {
+ var expectedMetrics = new String[] {targetInfoMetricName,
cpuCountMetricName};
+ var actualMetrics =
getMetricsFromPrometheus(pulsarCluster.getProxy(), prometheusExporterPort);
+ assertThat(expectedMetrics).allMatch(expectedMetric ->
!actualMetrics.findByNameAndLabels(expectedMetric,
Pair.of("pulsar_cluster", clusterName),
Pair.of("service_name",
PulsarProxyOpenTelemetry.SERVICE_NAME),
Pair.of("service_version", PulsarVersion.getVersion()),
- Pair.of("host_name",
pulsarCluster.getProxy().getHostname())).isEmpty();
+ Pair.of("host_name",
pulsarCluster.getProxy().getHostname())).isEmpty());
});
- Awaitility.waitAtMost(90,
TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(()
-> {
- var metrics =
getMetricsFromPrometheus(pulsarCluster.getAnyWorker(), prometheusExporterPort);
- return !metrics.findByNameAndLabels(metricName,
+ waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1,
TimeUnit.SECONDS).untilAsserted(() -> {
+ var expectedMetrics = new String[] {targetInfoMetricName,
cpuCountMetricName};
+ var actualMetrics =
getMetricsFromPrometheus(pulsarCluster.getAnyWorker(), prometheusExporterPort);
+ assertThat(expectedMetrics).allMatch(expectedMetric ->
!actualMetrics.findByNameAndLabels(expectedMetric,
Pair.of("pulsar_cluster", clusterName),
Pair.of("service_name",
PulsarWorkerOpenTelemetry.SERVICE_NAME),
Pair.of("service_version", PulsarVersion.getVersion()),
- Pair.of("host_name",
pulsarCluster.getAnyWorker().getHostname())).isEmpty();
+ Pair.of("host_name",
pulsarCluster.getAnyWorker().getHostname())).isEmpty());
});
}