This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a1bbe6ba09 [feat][broker] PIP-264: Add OpenTelemetry broker 
connection metrics (#22931)
6a1bbe6ba09 is described below

commit 6a1bbe6ba092336ff66658f985a25de901687683
Author: Dragos Misca <[email protected]>
AuthorDate: Tue Jun 18 17:30:08 2024 -0700

    [feat][broker] PIP-264: Add OpenTelemetry broker connection metrics (#22931)
---
 .../apache/pulsar/broker/service/PulsarStats.java  |   3 +-
 .../broker/stats/BrokerOperabilityMetrics.java     |  57 +++++++++--
 .../OpenTelemetryBrokerOperabilityStatsTest.java   | 104 +++++++++++++++++++++
 .../opentelemetry/OpenTelemetryAttributes.java     |  16 ++++
 4 files changed, 170 insertions(+), 10 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java
index db14892d266..7ffc7818d4c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java
@@ -78,8 +78,7 @@ public class PulsarStats implements Closeable {
         this.bundleStats = new ConcurrentHashMap<>();
         this.tempMetricsCollection = new ArrayList<>();
         this.metricsCollection = new ArrayList<>();
-        this.brokerOperabilityMetrics = new 
BrokerOperabilityMetrics(pulsar.getConfiguration().getClusterName(),
-                pulsar.getAdvertisedAddress());
+        this.brokerOperabilityMetrics = new BrokerOperabilityMetrics(pulsar);
         this.tempNonPersistentTopics = new ArrayList<>();
 
         this.exposePublisherStats = 
pulsar.getConfiguration().isExposePublisherStats();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java
index b6379d381c6..3f991be8184 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.stats;
 
+import io.opentelemetry.api.metrics.ObservableLongCounter;
 import io.prometheus.client.Counter;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -25,32 +26,72 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.LongAdder;
+import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.common.stats.Metrics;
+import 
org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.ConnectionCreateStatus;
+import 
org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.ConnectionStatus;
 
 /**
  */
-public class BrokerOperabilityMetrics {
+public class BrokerOperabilityMetrics implements AutoCloseable {
     private static final Counter TOPIC_LOAD_FAILED = 
Counter.build("topic_load_failed", "-").register();
     private final List<Metrics> metricsList;
     private final String localCluster;
     private final DimensionStats topicLoadStats;
     private final String brokerName;
     private final LongAdder connectionTotalCreatedCount;
-    private final LongAdder connectionCreateSuccessCount;
-    private final LongAdder connectionCreateFailCount;
     private final LongAdder connectionTotalClosedCount;
     private final LongAdder connectionActive;
 
-    public BrokerOperabilityMetrics(String localCluster, String brokerName) {
+    private final LongAdder connectionCreateSuccessCount;
+    private final LongAdder connectionCreateFailCount;
+
+    public static final String CONNECTION_COUNTER_METRIC_NAME = 
"pulsar.broker.connection.count";
+    private final ObservableLongCounter connectionCounter;
+
+    public static final String CONNECTION_CREATE_COUNTER_METRIC_NAME =
+            "pulsar.broker.connection.create.operation.count";
+    private final ObservableLongCounter connectionCreateCounter;
+
+    public BrokerOperabilityMetrics(PulsarService pulsar) {
         this.metricsList = new ArrayList<>();
-        this.localCluster = localCluster;
+        this.localCluster = pulsar.getConfiguration().getClusterName();
         this.topicLoadStats = new DimensionStats("pulsar_topic_load_times", 
60);
-        this.brokerName = brokerName;
+        this.brokerName = pulsar.getAdvertisedAddress();
         this.connectionTotalCreatedCount = new LongAdder();
-        this.connectionCreateSuccessCount = new LongAdder();
-        this.connectionCreateFailCount = new LongAdder();
         this.connectionTotalClosedCount = new LongAdder();
         this.connectionActive = new LongAdder();
+
+        this.connectionCreateSuccessCount = new LongAdder();
+        this.connectionCreateFailCount = new LongAdder();
+
+        connectionCounter = pulsar.getOpenTelemetry().getMeter()
+                .counterBuilder(CONNECTION_COUNTER_METRIC_NAME)
+                .setDescription("The number of connections.")
+                .setUnit("{connection}")
+                .buildWithCallback(measurement -> {
+                    var closedConnections = connectionTotalClosedCount.sum();
+                    var openedConnections = connectionTotalCreatedCount.sum();
+                    var activeConnections = openedConnections - 
closedConnections;
+                    measurement.record(activeConnections, 
ConnectionStatus.ACTIVE.attributes);
+                    measurement.record(openedConnections, 
ConnectionStatus.OPEN.attributes);
+                    measurement.record(closedConnections, 
ConnectionStatus.CLOSE.attributes);
+                });
+
+        connectionCreateCounter = pulsar.getOpenTelemetry().getMeter()
+                .counterBuilder(CONNECTION_CREATE_COUNTER_METRIC_NAME)
+                .setDescription("The number of connection create operations.")
+                .setUnit("{operation}")
+                .buildWithCallback(measurement -> {
+                    measurement.record(connectionCreateSuccessCount.sum(), 
ConnectionCreateStatus.SUCCESS.attributes);
+                    measurement.record(connectionCreateFailCount.sum(), 
ConnectionCreateStatus.FAILURE.attributes);
+                });
+    }
+
+    @Override
+    public void close() throws Exception {
+        connectionCounter.close();
+        connectionCreateCounter.close();
     }
 
     public List<Metrics> getMetrics() {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryBrokerOperabilityStatsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryBrokerOperabilityStatsTest.java
new file mode 100644
index 00000000000..4378e6b05b3
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryBrokerOperabilityStatsTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats;
+
+import static 
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.testcontext.PulsarTestContext;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
+import 
org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.ConnectionCreateStatus;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class OpenTelemetryBrokerOperabilityStatsTest extends BrokerTestBase {
+
+    @BeforeMethod(alwaysRun = true)
+    @Override
+    protected void setup() throws Exception {
+        super.baseSetup();
+    }
+
+    @Override
+    protected void 
customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder 
pulsarTestContextBuilder) {
+        super.customizeMainPulsarTestContextBuilder(pulsarTestContextBuilder);
+        pulsarTestContextBuilder.enableOpenTelemetry(true);
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testBrokerConnection() throws Exception {
+        var topicName = 
BrokerTestUtil.newUniqueName("persistent://my-namespace/use/my-ns/testBrokerConnection");
+
+        @Cleanup
+        var producer = pulsarClient.newProducer().topic(topicName).create();
+
+        var metrics = 
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+        assertMetricLongSumValue(metrics, 
BrokerOperabilityMetrics.CONNECTION_COUNTER_METRIC_NAME,
+                OpenTelemetryAttributes.ConnectionStatus.OPEN.attributes, 1);
+        assertMetricLongSumValue(metrics, 
BrokerOperabilityMetrics.CONNECTION_COUNTER_METRIC_NAME,
+                OpenTelemetryAttributes.ConnectionStatus.CLOSE.attributes, 0);
+        assertMetricLongSumValue(metrics, 
BrokerOperabilityMetrics.CONNECTION_COUNTER_METRIC_NAME,
+                OpenTelemetryAttributes.ConnectionStatus.ACTIVE.attributes, 1);
+
+        assertMetricLongSumValue(metrics, 
BrokerOperabilityMetrics.CONNECTION_CREATE_COUNTER_METRIC_NAME,
+                ConnectionCreateStatus.SUCCESS.attributes, 1);
+        assertMetricLongSumValue(metrics, 
BrokerOperabilityMetrics.CONNECTION_CREATE_COUNTER_METRIC_NAME,
+                ConnectionCreateStatus.FAILURE.attributes, 0);
+
+        pulsarClient.close();
+
+        metrics = 
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+        assertMetricLongSumValue(metrics, 
BrokerOperabilityMetrics.CONNECTION_COUNTER_METRIC_NAME,
+                OpenTelemetryAttributes.ConnectionStatus.CLOSE.attributes, 1);
+
+        pulsar.getConfiguration().setAuthenticationEnabled(true);
+
+        replacePulsarClient(PulsarClient.builder()
+                .serviceUrl(lookupUrl.toString())
+                .operationTimeout(1, TimeUnit.MILLISECONDS));
+        assertThatThrownBy(() -> 
pulsarClient.newProducer().topic(topicName).create())
+                
.isInstanceOf(PulsarClientException.AuthenticationException.class);
+        pulsarClient.close();
+
+        metrics = 
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+        assertMetricLongSumValue(metrics, 
BrokerOperabilityMetrics.CONNECTION_COUNTER_METRIC_NAME,
+                OpenTelemetryAttributes.ConnectionStatus.OPEN.attributes, 2);
+        assertMetricLongSumValue(metrics, 
BrokerOperabilityMetrics.CONNECTION_COUNTER_METRIC_NAME,
+                OpenTelemetryAttributes.ConnectionStatus.CLOSE.attributes, 2);
+        assertMetricLongSumValue(metrics, 
BrokerOperabilityMetrics.CONNECTION_COUNTER_METRIC_NAME,
+                OpenTelemetryAttributes.ConnectionStatus.ACTIVE.attributes, 0);
+
+        assertMetricLongSumValue(metrics, 
BrokerOperabilityMetrics.CONNECTION_CREATE_COUNTER_METRIC_NAME,
+                ConnectionCreateStatus.SUCCESS.attributes, 1);
+        assertMetricLongSumValue(metrics, 
BrokerOperabilityMetrics.CONNECTION_CREATE_COUNTER_METRIC_NAME,
+                ConnectionCreateStatus.FAILURE.attributes, 1);
+    }
+}
diff --git 
a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
 
b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
index 004741b6dfb..6639cd68b39 100644
--- 
a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
+++ 
b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
@@ -142,4 +142,20 @@ public interface OpenTelemetryAttributes {
         TIME;
         public final Attributes attributes = 
Attributes.of(PULSAR_BACKLOG_QUOTA_TYPE, name().toLowerCase());
     }
+
+    AttributeKey<String> PULSAR_CONNECTION_STATUS = 
AttributeKey.stringKey("pulsar.connection.status");
+    enum ConnectionStatus {
+        ACTIVE,
+        OPEN,
+        CLOSE;
+        public final Attributes attributes = 
Attributes.of(PULSAR_CONNECTION_STATUS, name().toLowerCase());
+    }
+
+    AttributeKey<String> PULSAR_CONNECTION_CREATE_STATUS =
+            
AttributeKey.stringKey("pulsar.connection.create.operation.status");
+    enum ConnectionCreateStatus {
+        SUCCESS,
+        FAILURE;
+        public final Attributes attributes = 
Attributes.of(PULSAR_CONNECTION_CREATE_STATUS, name().toLowerCase());
+    }
 }

Reply via email to