This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6a1bbe6ba09 [feat][broker] PIP-264: Add OpenTelemetry broker
connection metrics (#22931)
6a1bbe6ba09 is described below
commit 6a1bbe6ba092336ff66658f985a25de901687683
Author: Dragos Misca <[email protected]>
AuthorDate: Tue Jun 18 17:30:08 2024 -0700
[feat][broker] PIP-264: Add OpenTelemetry broker connection metrics (#22931)
---
.../apache/pulsar/broker/service/PulsarStats.java | 3 +-
.../broker/stats/BrokerOperabilityMetrics.java | 57 +++++++++--
.../OpenTelemetryBrokerOperabilityStatsTest.java | 104 +++++++++++++++++++++
.../opentelemetry/OpenTelemetryAttributes.java | 16 ++++
4 files changed, 170 insertions(+), 10 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java
index db14892d266..7ffc7818d4c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java
@@ -78,8 +78,7 @@ public class PulsarStats implements Closeable {
this.bundleStats = new ConcurrentHashMap<>();
this.tempMetricsCollection = new ArrayList<>();
this.metricsCollection = new ArrayList<>();
- this.brokerOperabilityMetrics = new
BrokerOperabilityMetrics(pulsar.getConfiguration().getClusterName(),
- pulsar.getAdvertisedAddress());
+ this.brokerOperabilityMetrics = new BrokerOperabilityMetrics(pulsar);
this.tempNonPersistentTopics = new ArrayList<>();
this.exposePublisherStats =
pulsar.getConfiguration().isExposePublisherStats();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java
index b6379d381c6..3f991be8184 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.stats;
+import io.opentelemetry.api.metrics.ObservableLongCounter;
import io.prometheus.client.Counter;
import java.util.ArrayList;
import java.util.HashMap;
@@ -25,32 +26,72 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
+import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.common.stats.Metrics;
+import
org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.ConnectionCreateStatus;
+import
org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.ConnectionStatus;
/**
*/
-public class BrokerOperabilityMetrics {
+public class BrokerOperabilityMetrics implements AutoCloseable {
private static final Counter TOPIC_LOAD_FAILED =
Counter.build("topic_load_failed", "-").register();
private final List<Metrics> metricsList;
private final String localCluster;
private final DimensionStats topicLoadStats;
private final String brokerName;
private final LongAdder connectionTotalCreatedCount;
- private final LongAdder connectionCreateSuccessCount;
- private final LongAdder connectionCreateFailCount;
private final LongAdder connectionTotalClosedCount;
private final LongAdder connectionActive;
- public BrokerOperabilityMetrics(String localCluster, String brokerName) {
+ private final LongAdder connectionCreateSuccessCount;
+ private final LongAdder connectionCreateFailCount;
+
+ public static final String CONNECTION_COUNTER_METRIC_NAME =
"pulsar.broker.connection.count";
+ private final ObservableLongCounter connectionCounter;
+
+ public static final String CONNECTION_CREATE_COUNTER_METRIC_NAME =
+ "pulsar.broker.connection.create.operation.count";
+ private final ObservableLongCounter connectionCreateCounter;
+
+ public BrokerOperabilityMetrics(PulsarService pulsar) {
this.metricsList = new ArrayList<>();
- this.localCluster = localCluster;
+ this.localCluster = pulsar.getConfiguration().getClusterName();
this.topicLoadStats = new DimensionStats("pulsar_topic_load_times",
60);
- this.brokerName = brokerName;
+ this.brokerName = pulsar.getAdvertisedAddress();
this.connectionTotalCreatedCount = new LongAdder();
- this.connectionCreateSuccessCount = new LongAdder();
- this.connectionCreateFailCount = new LongAdder();
this.connectionTotalClosedCount = new LongAdder();
this.connectionActive = new LongAdder();
+
+ this.connectionCreateSuccessCount = new LongAdder();
+ this.connectionCreateFailCount = new LongAdder();
+
+ connectionCounter = pulsar.getOpenTelemetry().getMeter()
+ .counterBuilder(CONNECTION_COUNTER_METRIC_NAME)
+ .setDescription("The number of connections.")
+ .setUnit("{connection}")
+ .buildWithCallback(measurement -> {
+ var closedConnections = connectionTotalClosedCount.sum();
+ var openedConnections = connectionTotalCreatedCount.sum();
+ var activeConnections = openedConnections -
closedConnections;
+ measurement.record(activeConnections,
ConnectionStatus.ACTIVE.attributes);
+ measurement.record(openedConnections,
ConnectionStatus.OPEN.attributes);
+ measurement.record(closedConnections,
ConnectionStatus.CLOSE.attributes);
+ });
+
+ connectionCreateCounter = pulsar.getOpenTelemetry().getMeter()
+ .counterBuilder(CONNECTION_CREATE_COUNTER_METRIC_NAME)
+ .setDescription("The number of connection create operations.")
+ .setUnit("{operation}")
+ .buildWithCallback(measurement -> {
+ measurement.record(connectionCreateSuccessCount.sum(),
ConnectionCreateStatus.SUCCESS.attributes);
+ measurement.record(connectionCreateFailCount.sum(),
ConnectionCreateStatus.FAILURE.attributes);
+ });
+ }
+
+ @Override
+ public void close() throws Exception {
+ connectionCounter.close();
+ connectionCreateCounter.close();
}
public List<Metrics> getMetrics() {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryBrokerOperabilityStatsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryBrokerOperabilityStatsTest.java
new file mode 100644
index 00000000000..4378e6b05b3
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryBrokerOperabilityStatsTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats;
+
+import static
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.testcontext.PulsarTestContext;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
+import
org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.ConnectionCreateStatus;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class OpenTelemetryBrokerOperabilityStatsTest extends BrokerTestBase {
+
+ @BeforeMethod(alwaysRun = true)
+ @Override
+ protected void setup() throws Exception {
+ super.baseSetup();
+ }
+
+ @Override
+ protected void
customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder
pulsarTestContextBuilder) {
+ super.customizeMainPulsarTestContextBuilder(pulsarTestContextBuilder);
+ pulsarTestContextBuilder.enableOpenTelemetry(true);
+ }
+
+ @AfterMethod(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ public void testBrokerConnection() throws Exception {
+ var topicName =
BrokerTestUtil.newUniqueName("persistent://my-namespace/use/my-ns/testBrokerConnection");
+
+ @Cleanup
+ var producer = pulsarClient.newProducer().topic(topicName).create();
+
+ var metrics =
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+ assertMetricLongSumValue(metrics,
BrokerOperabilityMetrics.CONNECTION_COUNTER_METRIC_NAME,
+ OpenTelemetryAttributes.ConnectionStatus.OPEN.attributes, 1);
+ assertMetricLongSumValue(metrics,
BrokerOperabilityMetrics.CONNECTION_COUNTER_METRIC_NAME,
+ OpenTelemetryAttributes.ConnectionStatus.CLOSE.attributes, 0);
+ assertMetricLongSumValue(metrics,
BrokerOperabilityMetrics.CONNECTION_COUNTER_METRIC_NAME,
+ OpenTelemetryAttributes.ConnectionStatus.ACTIVE.attributes, 1);
+
+ assertMetricLongSumValue(metrics,
BrokerOperabilityMetrics.CONNECTION_CREATE_COUNTER_METRIC_NAME,
+ ConnectionCreateStatus.SUCCESS.attributes, 1);
+ assertMetricLongSumValue(metrics,
BrokerOperabilityMetrics.CONNECTION_CREATE_COUNTER_METRIC_NAME,
+ ConnectionCreateStatus.FAILURE.attributes, 0);
+
+ pulsarClient.close();
+
+ metrics =
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+ assertMetricLongSumValue(metrics,
BrokerOperabilityMetrics.CONNECTION_COUNTER_METRIC_NAME,
+ OpenTelemetryAttributes.ConnectionStatus.CLOSE.attributes, 1);
+
+ pulsar.getConfiguration().setAuthenticationEnabled(true);
+
+ replacePulsarClient(PulsarClient.builder()
+ .serviceUrl(lookupUrl.toString())
+ .operationTimeout(1, TimeUnit.MILLISECONDS));
+ assertThatThrownBy(() ->
pulsarClient.newProducer().topic(topicName).create())
+
.isInstanceOf(PulsarClientException.AuthenticationException.class);
+ pulsarClient.close();
+
+ metrics =
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+ assertMetricLongSumValue(metrics,
BrokerOperabilityMetrics.CONNECTION_COUNTER_METRIC_NAME,
+ OpenTelemetryAttributes.ConnectionStatus.OPEN.attributes, 2);
+ assertMetricLongSumValue(metrics,
BrokerOperabilityMetrics.CONNECTION_COUNTER_METRIC_NAME,
+ OpenTelemetryAttributes.ConnectionStatus.CLOSE.attributes, 2);
+ assertMetricLongSumValue(metrics,
BrokerOperabilityMetrics.CONNECTION_COUNTER_METRIC_NAME,
+ OpenTelemetryAttributes.ConnectionStatus.ACTIVE.attributes, 0);
+
+ assertMetricLongSumValue(metrics,
BrokerOperabilityMetrics.CONNECTION_CREATE_COUNTER_METRIC_NAME,
+ ConnectionCreateStatus.SUCCESS.attributes, 1);
+ assertMetricLongSumValue(metrics,
BrokerOperabilityMetrics.CONNECTION_CREATE_COUNTER_METRIC_NAME,
+ ConnectionCreateStatus.FAILURE.attributes, 1);
+ }
+}
diff --git
a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
index 004741b6dfb..6639cd68b39 100644
---
a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
+++
b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
@@ -142,4 +142,20 @@ public interface OpenTelemetryAttributes {
TIME;
public final Attributes attributes =
Attributes.of(PULSAR_BACKLOG_QUOTA_TYPE, name().toLowerCase());
}
+
+ AttributeKey<String> PULSAR_CONNECTION_STATUS =
AttributeKey.stringKey("pulsar.connection.status");
+ enum ConnectionStatus {
+ ACTIVE,
+ OPEN,
+ CLOSE;
+ public final Attributes attributes =
Attributes.of(PULSAR_CONNECTION_STATUS, name().toLowerCase());
+ }
+
+ AttributeKey<String> PULSAR_CONNECTION_CREATE_STATUS =
+
AttributeKey.stringKey("pulsar.connection.create.operation.status");
+ enum ConnectionCreateStatus {
+ SUCCESS,
+ FAILURE;
+ public final Attributes attributes =
Attributes.of(PULSAR_CONNECTION_CREATE_STATUS, name().toLowerCase());
+ }
}