shibd commented on code in PR #25306:
URL: https://github.com/apache/pulsar/pull/25306#discussion_r2939323065


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryTopicStats.java:
##########
@@ -415,73 +415,94 @@ private void recordMetricsForTopic(Topic topic) {
             var persistentTopicMetrics = 
persistentTopic.getPersistentTopicMetrics();
 
             var persistentTopicAttributes = 
persistentTopic.getTopicAttributes();
+            // For persistent topics, get custom attributes once and reuse
+            var customLabels = persistentTopicAttributes.getCustomAttributes();
+            var attributesWithCustomLabels = persistentTopicAttributes
+                .buildAttributesWithCustomLabels(attributes, customLabels);
             var managedLedger = persistentTopic.getManagedLedger();
             var managedLedgerStats = 
persistentTopic.getManagedLedger().getStats();
-            storageCounter.record(managedLedgerStats.getStoredMessagesSize(), 
attributes);
-            
storageLogicalCounter.record(managedLedgerStats.getStoredMessagesLogicalSize(), 
attributes);
-            
storageBacklogCounter.record(managedLedger.getEstimatedBacklogSize(), 
attributes);
-            storageOffloadedCounter.record(managedLedger.getOffloadedSize(), 
attributes);
-            
storageInCounter.record(managedLedgerStats.getReadEntriesSucceededTotal(), 
attributes);
-            
storageOutCounter.record(managedLedgerStats.getAddEntrySucceedTotal(), 
attributes);
+            storageCounter.record(managedLedgerStats.getStoredMessagesSize(), 
attributesWithCustomLabels);

Review Comment:
   Could we push this concern down into `PersistentTopicAttributes` instead of 
threading `attributesWithCustomLabels` through each recorder here? 
`TopicAttributes` already exposes `getCommonAttributes()`, so overriding that 
in `PersistentTopicAttributes` to merge in `getCustomAttributes()` would let 
both the shared topic metrics and the persistent-only metrics pick up the 
custom labels automatically. That would also reduce the amount of mechanical 
churn in `OpenTelemetryTopicStats` and make it harder to miss one metric when 
new recorders are added later.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryTopicStats.java:
##########
@@ -415,73 +415,94 @@ private void recordMetricsForTopic(Topic topic) {
             var persistentTopicMetrics = 
persistentTopic.getPersistentTopicMetrics();
 
             var persistentTopicAttributes = 
persistentTopic.getTopicAttributes();
+            // For persistent topics, get custom attributes once and reuse
+            var customLabels = persistentTopicAttributes.getCustomAttributes();
+            var attributesWithCustomLabels = persistentTopicAttributes

Review Comment:
   
https://github.com/apache/pulsar/pull/25306/changes#diff-e3d8386fc3f9292780771f4eae19d9297637d374244f9c4b3f89b6c0e442e2e3R399-R412



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryCustomLabelsTest.java:
##########
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats;
+
+import static 
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
+import static org.assertj.core.api.Assertions.assertThat;
+import io.opentelemetry.api.common.Attributes;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.testcontext.PulsarTestContext;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class OpenTelemetryCustomLabelsTest extends BrokerTestBase {
+
+    private static final Set<String> ALLOWED_CUSTOM_METRIC_LABEL_KEYS = 
Set.of("__SLA_TIER", "APP_OWNER");

Review Comment:
   The allow-list key here is `__SLA_TIER`, but the test later writes 
`SLA_TIER`. `getCustomMetricLabelsMap()` filters by exact property key before 
lowercasing for OpenTelemetry output, so this label is not actually whitelisted.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryTopicStats.java:
##########
@@ -415,73 +415,94 @@ private void recordMetricsForTopic(Topic topic) {
             var persistentTopicMetrics = 
persistentTopic.getPersistentTopicMetrics();
 
             var persistentTopicAttributes = 
persistentTopic.getTopicAttributes();
+            // For persistent topics, get custom attributes once and reuse
+            var customLabels = persistentTopicAttributes.getCustomAttributes();
+            var attributesWithCustomLabels = persistentTopicAttributes

Review Comment:
   Custom labels are only merged into the persistent-only metrics below. The 
AbstractTopic topic counters above (`subscription`, `producer`, `consumer`, 
`message`, `bytes`, `publish rate limit`) still record with `attributes`, so 
this change does not actually expose custom labels across all OpenTelemetry 
topic metrics.



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryCustomLabelsTest.java:
##########
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats;
+
+import static 
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
+import static org.assertj.core.api.Assertions.assertThat;
+import io.opentelemetry.api.common.Attributes;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.testcontext.PulsarTestContext;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class OpenTelemetryCustomLabelsTest extends BrokerTestBase {
+
+    private static final Set<String> ALLOWED_CUSTOM_METRIC_LABEL_KEYS = 
Set.of("__SLA_TIER", "APP_OWNER");
+
+    @BeforeMethod(alwaysRun = true)
+    @Override
+    protected void setup() throws Exception {
+        super.baseSetup();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Override
+    protected void 
customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder builder) {
+        super.customizeMainPulsarTestContextBuilder(builder);
+        builder.enableOpenTelemetry(true);
+    }
+
+    @Override
+    protected ServiceConfiguration getDefaultConf() {
+        ServiceConfiguration conf = super.getDefaultConf();
+        conf.setTopicLevelPoliciesEnabled(true);
+        conf.setSystemTopicEnabled(true);
+        conf.setExposeCustomTopicMetricLabelsEnabled(true);
+        
conf.setAllowedTopicPropertyKeysForMetrics(ALLOWED_CUSTOM_METRIC_LABEL_KEYS);
+        conf.setBrokerShutdownTimeoutMs(5000L);
+        return conf;
+    }
+
+    @Test(timeOut = 30_000)
+    public void testCustomLabelsInOpenTelemetryMetrics() throws Exception {
+        var topic1 = 
BrokerTestUtil.newUniqueName("persistent://prop/ns-abc/testCustomLabels1");
+        var topic2 = 
BrokerTestUtil.newUniqueName("persistent://prop/ns-abc/testCustomLabels2");
+
+        admin.topics().createNonPartitionedTopic(topic1);
+        admin.topics().createPartitionedTopic(topic2, 2);
+
+        @Cleanup
+        Producer<byte[]> p1 = 
pulsarClient.newProducer().topic(topic1).create();
+        @Cleanup
+        Producer<byte[]> p2 = 
pulsarClient.newProducer().topic(topic2).create();
+
+        @Cleanup
+        Consumer<byte[]> c1 = pulsarClient.newConsumer()
+            .topic(topic1)
+            .subscriptionName("test")
+            .subscribe();
+        @Cleanup
+        Consumer<byte[]> c2 = pulsarClient.newConsumer()
+            .topic(topic2)
+            .subscriptionName("test")
+            .subscribe();
+
+        // Produce and consume messages
+        for (int i = 0; i < 5; i++) {
+            p1.send(("message-" + i).getBytes());
+            p2.send(("message-" + i).getBytes());
+        }
+        for (int i = 0; i < 5; i++) {
+            c1.acknowledge(c1.receive());
+            c2.acknowledge(c2.receive());
+        }
+
+        // Set custom metric labels for topic1
+        Map<String, String> labels1 = new HashMap<>();
+        labels1.put("SLA_TIER", "gold");
+        labels1.put("APP_OWNER", "team-a");
+        admin.topics().updateProperties(topic1, labels1);
+
+        // Set custom metric labels for topic2
+        Map<String, String> labels2 = new HashMap<>();
+        labels2.put("SLA_TIER", "platinum");
+        labels2.put("APP_OWNER", "team-b");
+        admin.topics().updateProperties(topic2, labels2);
+
+        // Wait for labels to be set
+        Awaitility.await().untilAsserted(() -> {
+            var retrievedLabels1 = admin.topics().getProperties(topic1);
+            assertThat(retrievedLabels1.get("sla_tier")).isEqualTo("gold");
+            assertThat(retrievedLabels1.get("app_owner")).isEqualTo("team-a");
+
+            var retrievedLabels2 = admin.topics().getProperties(topic2);
+            assertThat(retrievedLabels2.get("sla_tier")).isEqualTo("platinum");
+            assertThat(retrievedLabels2.get("app_owner")).isEqualTo("team-b");
+        });
+
+        // Collect metrics and verify custom labels are present
+        var metrics = 
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+
+        // Build expected attributes for topic1 with custom labels
+        var attributesTopic1 = Attributes.builder()
+            .put(OpenTelemetryAttributes.PULSAR_DOMAIN, "persistent")
+            .put(OpenTelemetryAttributes.PULSAR_TENANT, "prop")
+            .put(OpenTelemetryAttributes.PULSAR_NAMESPACE, "prop/ns-abc")
+            .put(OpenTelemetryAttributes.PULSAR_TOPIC, topic1)
+            .put("sla_tier", "gold")
+            .put("app_owner", "team-a")
+            .build();
+
+        // Build expected attributes for topic2 with custom labels
+        var attributesTopic2 = Attributes.builder()
+            .put(OpenTelemetryAttributes.PULSAR_DOMAIN, "persistent")
+            .put(OpenTelemetryAttributes.PULSAR_TENANT, "prop")
+            .put(OpenTelemetryAttributes.PULSAR_NAMESPACE, "prop/ns-abc")
+            .put(OpenTelemetryAttributes.PULSAR_TOPIC, topic2)
+            .put("sla_tier", "platinum")
+            .put("app_owner", "team-b")
+            .build();
+
+        // Verify metrics contain custom labels for topic1
+        assertMetricLongSumValue(metrics, 
OpenTelemetryTopicStats.MESSAGE_IN_COUNTER, attributesTopic1, 5);

Review Comment:
   This test is asserting `MESSAGE_IN_COUNTER` / `PRODUCER_COUNTER`, but the 
patch only changes the persistent-topic metrics in the lower block (`storage*`, 
backlog, transaction, compaction, delayed subscription). Even if this passes, 
it does not verify the metrics that were actually modified here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to