This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 02a7bbce7eb [improve][client] Add OpenTelemetry metrics for client 
memory buffer usage (#24647)
02a7bbce7eb is described below

commit 02a7bbce7eb107a3cc849d54e9d373b1d57659b9
Author: Penghui Li <peng...@apache.org>
AuthorDate: Wed Aug 20 10:30:10 2025 -0700

    [improve][client] Add OpenTelemetry metrics for client memory buffer usage 
(#24647)
    
    (cherry picked from commit a66e8068058664d65fe71d5d711a14a898840b46)
---
 .../pulsar/client/metrics/ClientMetricsTest.java   |  82 +++++++++++
 .../pulsar/client/impl/MemoryLimitController.java  |   4 +
 .../pulsar/client/impl/PulsarClientImpl.java       |  16 +++
 .../client/impl/metrics/InstrumentProvider.java    |   8 ++
 .../client/impl/metrics/MemoryBufferStats.java     |  63 +++++++++
 .../impl/metrics/ObservableUpDownCounter.java      |  79 +++++++++++
 .../client/impl/metrics/MemoryBufferStatsTest.java | 152 +++++++++++++++++++++
 7 files changed, 404 insertions(+)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/metrics/ClientMetricsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/metrics/ClientMetricsTest.java
index 02b38acf865..5728f3d0e01 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/metrics/ClientMetricsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/metrics/ClientMetricsTest.java
@@ -38,8 +38,10 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SizeUnit;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.assertj.core.api.Assertions;
+import org.awaitility.Awaitility;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -343,4 +345,84 @@ public class ClientMetricsTest extends 
ProducerConsumerBase {
         assertCounterValue(metrics, "pulsar.client.consumer.closed", 1, 
nsAttrs);
         assertCounterValue(metrics, "pulsar.client.connection.closed", 1, 
Attributes.empty());
     }
+
+    @Test
+    public void testMemoryBufferMetrics() throws Exception {
+        String topic = newTopicName();
+        long memoryLimit = 1024 * 1024; // 1MB
+
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .openTelemetry(otel)
+                .memoryLimit(memoryLimit, 
org.apache.pulsar.client.api.SizeUnit.BYTES)
+                .build();
+
+        Producer<byte[]> producer = client.newProducer()
+                .topic(topic)
+                .batchingMaxPublishDelay(1, TimeUnit.DAYS)
+                .batchingMaxBytes(1024 * 1024)
+                .create();
+
+        var metrics = collectMetrics();
+
+        // Verify memory buffer limit is reported correctly
+        assertCounterValue(metrics, "pulsar.client.memory.buffer.limit", 
memoryLimit, Attributes.empty());
+
+        // Initially, memory usage should be 0 or very low
+        long initialUsage = getCounterValue(metrics, 
"pulsar.client.memory.buffer.usage", Attributes.empty());
+        
Assertions.assertThat(initialUsage).isGreaterThanOrEqualTo(0).isLessThan(memoryLimit
 / 4);
+
+        producer.sendAsync(new byte[512 * 1024]);
+
+        metrics = collectMetrics();
+
+        // Verify memory usage increased
+        long usageAfterSend = getCounterValue(metrics, 
"pulsar.client.memory.buffer.usage", Attributes.empty());
+        Assertions.assertThat(usageAfterSend).isGreaterThan(initialUsage);
+
+        // Verify limit is still correct
+        assertCounterValue(metrics, "pulsar.client.memory.buffer.limit", 
memoryLimit, Attributes.empty());
+
+        // Flush all pending messages
+        producer.flush();
+
+        Awaitility.await().untilAsserted(() -> {
+            var newMetrics = collectMetrics();
+            // Memory usage should be lower after flushing
+            long usageAfterFlush = getCounterValue(newMetrics, 
"pulsar.client.memory.buffer.usage", Attributes.empty());
+            
Assertions.assertThat(usageAfterFlush).isLessThanOrEqualTo(usageAfterSend);
+        });
+
+        producer.close();
+        client.close();
+    }
+
+    @Test
+    public void testMemoryBufferMetricsWithNoLimit() throws Exception {
+        // Create client without memory limit
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .openTelemetry(otel)
+                .memoryLimit(0L, SizeUnit.BYTES)
+                .build();
+
+        String topic = newTopicName();
+        Producer<String> producer = client.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        producer.send("test message");
+
+        var metrics = collectMetrics();
+
+        // When memory limiting is disabled, buffer metrics should not be 
present at all
+        boolean hasUsageMetric = 
metrics.containsKey("pulsar.client.memory.buffer.usage");
+        boolean hasLimitMetric = 
metrics.containsKey("pulsar.client.memory.buffer.limit");
+
+        // Since memory limiting is disabled, these metrics should not exist
+        Assertions.assertThat(hasUsageMetric).isFalse();
+        Assertions.assertThat(hasLimitMetric).isFalse();
+        producer.close();
+        client.close();
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java
index c15821c0543..d7acfd69128 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java
@@ -145,4 +145,8 @@ public class MemoryLimitController {
     public boolean isMemoryLimited() {
         return memoryLimit > 0;
     }
+
+    public long memoryLimit() {
+        return memoryLimit;
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 950a01afb46..01b27f491e1 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -74,6 +74,7 @@ import 
org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
 import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
+import org.apache.pulsar.client.impl.metrics.MemoryBufferStats;
 import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
 import org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema;
 import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
@@ -146,6 +147,7 @@ public class PulsarClientImpl implements PulsarClient {
 
     protected final EventLoopGroup eventLoopGroup;
     private final MemoryLimitController memoryLimitController;
+    private final MemoryBufferStats memoryBufferStats;
 
     private final LoadingCache<String, SchemaInfoProvider> 
schemaProviderLoadingCache =
             CacheBuilder.newBuilder().maximumSize(100000)
@@ -265,6 +267,12 @@ public class PulsarClientImpl implements PulsarClient {
             memoryLimitController = new 
MemoryLimitController(conf.getMemoryLimitBytes(),
                     (long) (conf.getMemoryLimitBytes() * 
THRESHOLD_FOR_CONSUMER_RECEIVER_QUEUE_SIZE_SHRINKING),
                     this::reduceConsumerReceiverQueueSize);
+            // Only create memory buffer metrics if memory limiting is enabled
+            if (memoryLimitController.isMemoryLimited()) {
+                memoryBufferStats = new MemoryBufferStats(instrumentProvider, 
memoryLimitController);
+            } else {
+                memoryBufferStats = null;
+            }
             state.set(State.Open);
         } catch (Throwable t) {
             // Log the exception first, or it could be missed if there are any 
subsequent exceptions in the
@@ -938,6 +946,14 @@ public class PulsarClientImpl implements PulsarClient {
             } catch (PulsarClientException e) {
                 throwable = e;
             }
+            if (memoryBufferStats != null) {
+                try {
+                    memoryBufferStats.close();
+                } catch (Throwable t) {
+                    log.warn("Failed to close memoryBufferStats", t);
+                    throwable = t;
+                }
+            }
             if (conf != null && conf.getAuthentication() != null) {
                 try {
                     conf.getAuthentication().close();
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/InstrumentProvider.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/InstrumentProvider.java
index 1e02af1fd37..a0bdd8b6fb6 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/InstrumentProvider.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/InstrumentProvider.java
@@ -23,6 +23,8 @@ import io.opentelemetry.api.GlobalOpenTelemetry;
 import io.opentelemetry.api.OpenTelemetry;
 import io.opentelemetry.api.common.Attributes;
 import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.api.metrics.ObservableLongMeasurement;
+import java.util.function.Consumer;
 import org.apache.pulsar.PulsarVersion;
 
 public class InstrumentProvider {
@@ -55,4 +57,10 @@ public class InstrumentProvider {
     public LatencyHistogram newLatencyHistogram(String name, String 
description, String topic, Attributes attributes) {
         return new LatencyHistogram(meter, name, description, topic, 
attributes);
     }
+
+    public ObservableUpDownCounter newObservableUpDownCounter(String name, 
Unit unit, String description,
+                                                            String topic, 
Attributes attributes,
+                                                            
Consumer<ObservableLongMeasurement> callback) {
+        return new ObservableUpDownCounter(meter, name, unit, description, 
topic, attributes, callback);
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/MemoryBufferStats.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/MemoryBufferStats.java
new file mode 100644
index 00000000000..4868562ffd3
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/MemoryBufferStats.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.client.impl.metrics;
+
+import org.apache.pulsar.client.impl.MemoryLimitController;
+
+public class MemoryBufferStats implements AutoCloseable {
+
+    public static final String BUFFER_USAGE_COUNTER = 
"pulsar.client.memory.buffer.usage";
+    private final ObservableUpDownCounter bufferUsageCounter;
+
+    public static final String BUFFER_LIMIT_COUNTER = 
"pulsar.client.memory.buffer.limit";
+    private final ObservableUpDownCounter bufferLimitCounter;
+
+    public MemoryBufferStats(InstrumentProvider instrumentProvider, 
MemoryLimitController memoryLimitController) {
+        bufferUsageCounter = instrumentProvider.newObservableUpDownCounter(
+                BUFFER_USAGE_COUNTER,
+                Unit.Bytes,
+                "Current memory buffer usage by the client",
+                null, // no topic
+                null, // no extra attributes
+                measurement -> {
+                    if (memoryLimitController.isMemoryLimited()) {
+                        
measurement.record(memoryLimitController.currentUsage());
+                    }
+                });
+
+        bufferLimitCounter = instrumentProvider.newObservableUpDownCounter(
+                BUFFER_LIMIT_COUNTER,
+                Unit.Bytes,
+                "Memory buffer limit configured for the client",
+                null, // no topic
+                null, // no extra attributes
+                measurement -> {
+                    if (memoryLimitController.isMemoryLimited()) {
+                        
measurement.record(memoryLimitController.memoryLimit());
+                    }
+                });
+    }
+
+    @Override
+    public void close() {
+        bufferUsageCounter.close();
+        bufferLimitCounter.close();
+    }
+}
\ No newline at end of file
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/ObservableUpDownCounter.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/ObservableUpDownCounter.java
new file mode 100644
index 00000000000..c7ca2907be2
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/ObservableUpDownCounter.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.client.impl.metrics;
+
+import static 
org.apache.pulsar.client.impl.metrics.MetricsUtil.getDefaultAggregationLabels;
+import static 
org.apache.pulsar.client.impl.metrics.MetricsUtil.getTopicAttributes;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.incubator.metrics.ExtendedLongUpDownCounterBuilder;
+import io.opentelemetry.api.metrics.LongUpDownCounterBuilder;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.api.metrics.ObservableLongMeasurement;
+import io.opentelemetry.api.metrics.ObservableLongUpDownCounter;
+import java.util.function.Consumer;
+
+public class ObservableUpDownCounter implements AutoCloseable {
+
+    private final ObservableLongUpDownCounter counter;
+
+    ObservableUpDownCounter(Meter meter, String name, Unit unit, String 
description, String topic,
+                           Attributes attributes, 
Consumer<ObservableLongMeasurement> callback) {
+        LongUpDownCounterBuilder builder = meter.upDownCounterBuilder(name)
+                .setDescription(description)
+                .setUnit(unit.toString());
+
+        if (topic != null) {
+            if (builder instanceof ExtendedLongUpDownCounterBuilder) {
+                ExtendedLongUpDownCounterBuilder eb = 
(ExtendedLongUpDownCounterBuilder) builder;
+                
eb.setAttributesAdvice(getDefaultAggregationLabels(attributes));
+            }
+
+            attributes = getTopicAttributes(topic, attributes);
+        }
+
+        final Attributes finalAttributes = attributes;
+        this.counter = builder.buildWithCallback(measurement -> {
+            if (finalAttributes != null && !finalAttributes.isEmpty()) {
+                callback.accept(new AttributeWrappedMeasurement(measurement, 
finalAttributes));
+            } else {
+                callback.accept(measurement);
+            }
+        });
+    }
+
+    @Override
+    public void close() {
+        counter.close();
+    }
+
+    private record AttributeWrappedMeasurement(ObservableLongMeasurement 
delegate,
+                                               Attributes attributes) 
implements ObservableLongMeasurement {
+
+        @Override
+        public void record(long value) {
+            delegate.record(value, attributes);
+        }
+
+        @Override
+        public void record(long value, Attributes attributes) {
+            delegate.record(value, 
this.attributes.toBuilder().putAll(attributes).build());
+        }
+    }
+}
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/metrics/MemoryBufferStatsTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/metrics/MemoryBufferStatsTest.java
new file mode 100644
index 00000000000..81745201be6
--- /dev/null
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/metrics/MemoryBufferStatsTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.client.impl.metrics;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.metrics.SdkMeterProvider;
+import io.opentelemetry.sdk.metrics.data.LongPointData;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
+import java.util.Collection;
+import org.apache.pulsar.client.impl.MemoryLimitController;
+import org.testng.annotations.Test;
+
+public class MemoryBufferStatsTest {
+
+    @Test
+    public void testMemoryBufferStatsWithMemoryUsage() {
+        long memoryLimit = 1024 * 1024; // 1MB
+        MemoryLimitController memoryLimitController = new 
MemoryLimitController(memoryLimit);
+
+        InMemoryMetricReader metricReader = InMemoryMetricReader.create();
+        SdkMeterProvider meterProvider = SdkMeterProvider.builder()
+                .registerMetricReader(metricReader)
+                .build();
+        OpenTelemetry openTelemetry = OpenTelemetrySdk.builder()
+                .setMeterProvider(meterProvider)
+                .build();
+
+        InstrumentProvider instrumentProvider = new 
InstrumentProvider(openTelemetry);
+
+        try (MemoryBufferStats memoryBufferStats = new 
MemoryBufferStats(instrumentProvider, memoryLimitController)) {
+            assertNotNull(memoryBufferStats);
+
+            // Test initial state - no memory used
+            Collection<MetricData> metrics = metricReader.collectAllMetrics();
+            assertUsageMetric(metrics, 0);
+            assertLimitMetric(metrics, memoryLimit);
+
+            // Reserve some memory
+            long reservedMemory = 512 * 1024; // 512KB
+            memoryLimitController.forceReserveMemory(reservedMemory);
+
+            // Collect metrics and verify
+            metrics = metricReader.collectAllMetrics();
+            assertUsageMetric(metrics, reservedMemory);
+            assertLimitMetric(metrics, memoryLimit);
+
+            // Reserve more memory
+            long additionalMemory = 256 * 1024; // 256KB
+            memoryLimitController.forceReserveMemory(additionalMemory);
+
+            // Verify total usage
+            metrics = metricReader.collectAllMetrics();
+            assertUsageMetric(metrics, reservedMemory + additionalMemory);
+            assertLimitMetric(metrics, memoryLimit);
+
+            // Release some memory
+            memoryLimitController.releaseMemory(additionalMemory);
+
+            // Verify usage decreased
+            metrics = metricReader.collectAllMetrics();
+            assertUsageMetric(metrics, reservedMemory);
+            assertLimitMetric(metrics, memoryLimit);
+
+            // Release all memory
+            memoryLimitController.releaseMemory(reservedMemory);
+
+            // Verify back to zero
+            metrics = metricReader.collectAllMetrics();
+            assertUsageMetric(metrics, 0);
+            assertLimitMetric(metrics, memoryLimit);
+        }
+    }
+
+    @Test
+    public void testMemoryBufferStatsWithNoMemoryLimit() {
+        MemoryLimitController memoryLimitController = new 
MemoryLimitController(0); // No limit
+
+        InMemoryMetricReader metricReader = InMemoryMetricReader.create();
+        SdkMeterProvider meterProvider = SdkMeterProvider.builder()
+                .registerMetricReader(metricReader)
+                .build();
+        OpenTelemetry openTelemetry = OpenTelemetrySdk.builder()
+                .setMeterProvider(meterProvider)
+                .build();
+
+        InstrumentProvider instrumentProvider = new 
InstrumentProvider(openTelemetry);
+
+        // When memory limiting is disabled, MemoryBufferStats should not be 
created at all
+        // This test verifies that the callback correctly checks for memory 
limiting
+        try (MemoryBufferStats memoryBufferStats = new 
MemoryBufferStats(instrumentProvider, memoryLimitController)) {
+            assertNotNull(memoryBufferStats);
+
+            // When memory limiting is disabled, no metrics should be recorded
+            Collection<MetricData> metrics = metricReader.collectAllMetrics();
+            assertTrue(metrics.isEmpty() || metrics.stream().noneMatch(metric 
->
+                    
metric.getName().equals(MemoryBufferStats.BUFFER_USAGE_COUNTER)
+                    || 
metric.getName().equals(MemoryBufferStats.BUFFER_LIMIT_COUNTER)));
+        }
+    }
+
+    private void assertUsageMetric(Collection<MetricData> metrics, long 
expectedValue) {
+        MetricData usageMetric = metrics.stream()
+                .filter(metric -> 
metric.getName().equals(MemoryBufferStats.BUFFER_USAGE_COUNTER))
+                .findFirst()
+                .orElse(null);
+
+        assertNotNull(usageMetric, "Usage metric should be present");
+
+        Collection<LongPointData> points = 
usageMetric.getLongSumData().getPoints();
+        assertEquals(points.size(), 1, "Should have exactly one data point");
+
+        LongPointData point = points.iterator().next();
+        assertEquals(point.getValue(), expectedValue, "Usage metric value 
should match expected");
+    }
+
+    private void assertLimitMetric(Collection<MetricData> metrics, long 
expectedValue) {
+        MetricData limitMetric = metrics.stream()
+                .filter(metric -> 
metric.getName().equals(MemoryBufferStats.BUFFER_LIMIT_COUNTER))
+                .findFirst()
+                .orElse(null);
+
+        assertNotNull(limitMetric, "Limit metric should be present");
+
+        Collection<LongPointData> points = 
limitMetric.getLongSumData().getPoints();
+        assertEquals(points.size(), 1, "Should have exactly one data point");
+
+        LongPointData point = points.iterator().next();
+        assertEquals(point.getValue(), expectedValue, "Limit metric value 
should match expected");
+    }
+}
\ No newline at end of file

Reply via email to