This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push: new 02a7bbce7eb [improve][client] Add OpenTelemetry metrics for client memory buffer usage (#24647) 02a7bbce7eb is described below commit 02a7bbce7eb107a3cc849d54e9d373b1d57659b9 Author: Penghui Li <peng...@apache.org> AuthorDate: Wed Aug 20 10:30:10 2025 -0700 [improve][client] Add OpenTelemetry metrics for client memory buffer usage (#24647) (cherry picked from commit a66e8068058664d65fe71d5d711a14a898840b46) --- .../pulsar/client/metrics/ClientMetricsTest.java | 82 +++++++++++ .../pulsar/client/impl/MemoryLimitController.java | 4 + .../pulsar/client/impl/PulsarClientImpl.java | 16 +++ .../client/impl/metrics/InstrumentProvider.java | 8 ++ .../client/impl/metrics/MemoryBufferStats.java | 63 +++++++++ .../impl/metrics/ObservableUpDownCounter.java | 79 +++++++++++ .../client/impl/metrics/MemoryBufferStatsTest.java | 152 +++++++++++++++++++++ 7 files changed, 404 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/metrics/ClientMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/metrics/ClientMetricsTest.java index 02b38acf865..5728f3d0e01 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/metrics/ClientMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/metrics/ClientMetricsTest.java @@ -38,8 +38,10 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SizeUnit; import org.apache.pulsar.client.api.SubscriptionType; import org.assertj.core.api.Assertions; +import org.awaitility.Awaitility; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -343,4 +345,84 @@ public class ClientMetricsTest extends ProducerConsumerBase { assertCounterValue(metrics, "pulsar.client.consumer.closed", 1, nsAttrs); assertCounterValue(metrics, "pulsar.client.connection.closed", 1, Attributes.empty()); } + + @Test + public void testMemoryBufferMetrics() throws Exception { + String topic = newTopicName(); + long memoryLimit = 1024 * 1024; // 1MB + + PulsarClient client = PulsarClient.builder() + .serviceUrl(pulsar.getBrokerServiceUrl()) + .openTelemetry(otel) + .memoryLimit(memoryLimit, org.apache.pulsar.client.api.SizeUnit.BYTES) + .build(); + + Producer<byte[]> producer = client.newProducer() + .topic(topic) + .batchingMaxPublishDelay(1, TimeUnit.DAYS) + .batchingMaxBytes(1024 * 1024) + .create(); + + var metrics = collectMetrics(); + + // Verify memory buffer limit is reported correctly + assertCounterValue(metrics, "pulsar.client.memory.buffer.limit", memoryLimit, Attributes.empty()); + + // Initially, memory usage should be 0 or very low + long initialUsage = getCounterValue(metrics, "pulsar.client.memory.buffer.usage", Attributes.empty()); + Assertions.assertThat(initialUsage).isGreaterThanOrEqualTo(0).isLessThan(memoryLimit / 4); + + producer.sendAsync(new byte[512 * 1024]); + + metrics = collectMetrics(); + + // Verify memory usage increased + long usageAfterSend = getCounterValue(metrics, "pulsar.client.memory.buffer.usage", Attributes.empty()); + Assertions.assertThat(usageAfterSend).isGreaterThan(initialUsage); + + // Verify limit is still correct + assertCounterValue(metrics, "pulsar.client.memory.buffer.limit", memoryLimit, Attributes.empty()); + + // Flush all pending messages + producer.flush(); + + Awaitility.await().untilAsserted(() -> { + var newMetrics = collectMetrics(); + // Memory usage should be lower after flushing + long usageAfterFlush = getCounterValue(newMetrics, "pulsar.client.memory.buffer.usage", Attributes.empty()); + Assertions.assertThat(usageAfterFlush).isLessThanOrEqualTo(usageAfterSend); + }); + + producer.close(); + client.close(); + } + + @Test + public void testMemoryBufferMetricsWithNoLimit() throws Exception { + // Create client without memory limit + PulsarClient client = PulsarClient.builder() + .serviceUrl(pulsar.getBrokerServiceUrl()) + .openTelemetry(otel) + .memoryLimit(0L, SizeUnit.BYTES) + .build(); + + String topic = newTopicName(); + Producer<String> producer = client.newProducer(Schema.STRING) + .topic(topic) + .create(); + + producer.send("test message"); + + var metrics = collectMetrics(); + + // When memory limiting is disabled, buffer metrics should not be present at all + boolean hasUsageMetric = metrics.containsKey("pulsar.client.memory.buffer.usage"); + boolean hasLimitMetric = metrics.containsKey("pulsar.client.memory.buffer.limit"); + + // Since memory limiting is disabled, these metrics should not exist + Assertions.assertThat(hasUsageMetric).isFalse(); + Assertions.assertThat(hasLimitMetric).isFalse(); + producer.close(); + client.close(); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java index c15821c0543..d7acfd69128 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java @@ -145,4 +145,8 @@ public class MemoryLimitController { public boolean isMemoryLimited() { return memoryLimit > 0; } + + public long memoryLimit() { + return memoryLimit; + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 950a01afb46..01b27f491e1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -74,6 +74,7 @@ import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; import org.apache.pulsar.client.impl.conf.ReaderConfigurationData; import org.apache.pulsar.client.impl.metrics.InstrumentProvider; +import org.apache.pulsar.client.impl.metrics.MemoryBufferStats; import org.apache.pulsar.client.impl.schema.AutoConsumeSchema; import org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema; import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema; @@ -146,6 +147,7 @@ public class PulsarClientImpl implements PulsarClient { protected final EventLoopGroup eventLoopGroup; private final MemoryLimitController memoryLimitController; + private final MemoryBufferStats memoryBufferStats; private final LoadingCache<String, SchemaInfoProvider> schemaProviderLoadingCache = CacheBuilder.newBuilder().maximumSize(100000) @@ -265,6 +267,12 @@ public class PulsarClientImpl implements PulsarClient { memoryLimitController = new MemoryLimitController(conf.getMemoryLimitBytes(), (long) (conf.getMemoryLimitBytes() * THRESHOLD_FOR_CONSUMER_RECEIVER_QUEUE_SIZE_SHRINKING), this::reduceConsumerReceiverQueueSize); + // Only create memory buffer metrics if memory limiting is enabled + if (memoryLimitController.isMemoryLimited()) { + memoryBufferStats = new MemoryBufferStats(instrumentProvider, memoryLimitController); + } else { + memoryBufferStats = null; + } state.set(State.Open); } catch (Throwable t) { // Log the exception first, or it could be missed if there are any subsequent exceptions in the @@ -938,6 +946,14 @@ public class PulsarClientImpl implements PulsarClient { } catch (PulsarClientException e) { throwable = e; } + if (memoryBufferStats != null) { + try { + memoryBufferStats.close(); + } catch (Throwable t) { + log.warn("Failed to close memoryBufferStats", t); + throwable = t; + } + } if (conf != null && conf.getAuthentication() != null) { try { conf.getAuthentication().close(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/InstrumentProvider.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/InstrumentProvider.java index 1e02af1fd37..a0bdd8b6fb6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/InstrumentProvider.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/InstrumentProvider.java @@ -23,6 +23,8 @@ import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.api.metrics.ObservableLongMeasurement; +import java.util.function.Consumer; import org.apache.pulsar.PulsarVersion; public class InstrumentProvider { @@ -55,4 +57,10 @@ public class InstrumentProvider { public LatencyHistogram newLatencyHistogram(String name, String description, String topic, Attributes attributes) { return new LatencyHistogram(meter, name, description, topic, attributes); } + + public ObservableUpDownCounter newObservableUpDownCounter(String name, Unit unit, String description, + String topic, Attributes attributes, + Consumer<ObservableLongMeasurement> callback) { + return new ObservableUpDownCounter(meter, name, unit, description, topic, attributes, callback); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/MemoryBufferStats.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/MemoryBufferStats.java new file mode 100644 index 00000000000..4868562ffd3 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/MemoryBufferStats.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.client.impl.metrics; + +import org.apache.pulsar.client.impl.MemoryLimitController; + +public class MemoryBufferStats implements AutoCloseable { + + public static final String BUFFER_USAGE_COUNTER = "pulsar.client.memory.buffer.usage"; + private final ObservableUpDownCounter bufferUsageCounter; + + public static final String BUFFER_LIMIT_COUNTER = "pulsar.client.memory.buffer.limit"; + private final ObservableUpDownCounter bufferLimitCounter; + + public MemoryBufferStats(InstrumentProvider instrumentProvider, MemoryLimitController memoryLimitController) { + bufferUsageCounter = instrumentProvider.newObservableUpDownCounter( + BUFFER_USAGE_COUNTER, + Unit.Bytes, + "Current memory buffer usage by the client", + null, // no topic + null, // no extra attributes + measurement -> { + if (memoryLimitController.isMemoryLimited()) { + measurement.record(memoryLimitController.currentUsage()); + } + }); + + bufferLimitCounter = instrumentProvider.newObservableUpDownCounter( + BUFFER_LIMIT_COUNTER, + Unit.Bytes, + "Memory buffer limit configured for the client", + null, // no topic + null, // no extra attributes + measurement -> { + if (memoryLimitController.isMemoryLimited()) { + measurement.record(memoryLimitController.memoryLimit()); + } + }); + } + + @Override + public void close() { + bufferUsageCounter.close(); + bufferLimitCounter.close(); + } +} \ No newline at end of file diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/ObservableUpDownCounter.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/ObservableUpDownCounter.java new file mode 100644 index 00000000000..c7ca2907be2 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/ObservableUpDownCounter.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.client.impl.metrics; + +import static org.apache.pulsar.client.impl.metrics.MetricsUtil.getDefaultAggregationLabels; +import static org.apache.pulsar.client.impl.metrics.MetricsUtil.getTopicAttributes; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.incubator.metrics.ExtendedLongUpDownCounterBuilder; +import io.opentelemetry.api.metrics.LongUpDownCounterBuilder; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.api.metrics.ObservableLongMeasurement; +import io.opentelemetry.api.metrics.ObservableLongUpDownCounter; +import java.util.function.Consumer; + +public class ObservableUpDownCounter implements AutoCloseable { + + private final ObservableLongUpDownCounter counter; + + ObservableUpDownCounter(Meter meter, String name, Unit unit, String description, String topic, + Attributes attributes, Consumer<ObservableLongMeasurement> callback) { + LongUpDownCounterBuilder builder = meter.upDownCounterBuilder(name) + .setDescription(description) + .setUnit(unit.toString()); + + if (topic != null) { + if (builder instanceof ExtendedLongUpDownCounterBuilder) { + ExtendedLongUpDownCounterBuilder eb = (ExtendedLongUpDownCounterBuilder) builder; + eb.setAttributesAdvice(getDefaultAggregationLabels(attributes)); + } + + attributes = getTopicAttributes(topic, attributes); + } + + final Attributes finalAttributes = attributes; + this.counter = builder.buildWithCallback(measurement -> { + if (finalAttributes != null && !finalAttributes.isEmpty()) { + callback.accept(new AttributeWrappedMeasurement(measurement, finalAttributes)); + } else { + callback.accept(measurement); + } + }); + } + + @Override + public void close() { + counter.close(); + } + + private record AttributeWrappedMeasurement(ObservableLongMeasurement delegate, + Attributes attributes) implements ObservableLongMeasurement { + + @Override + public void record(long value) { + delegate.record(value, attributes); + } + + @Override + public void record(long value, Attributes attributes) { + delegate.record(value, this.attributes.toBuilder().putAll(attributes).build()); + } + } +} diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/metrics/MemoryBufferStatsTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/metrics/MemoryBufferStatsTest.java new file mode 100644 index 00000000000..81745201be6 --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/metrics/MemoryBufferStatsTest.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.client.impl.metrics; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.metrics.data.LongPointData; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; +import java.util.Collection; +import org.apache.pulsar.client.impl.MemoryLimitController; +import org.testng.annotations.Test; + +public class MemoryBufferStatsTest { + + @Test + public void testMemoryBufferStatsWithMemoryUsage() { + long memoryLimit = 1024 * 1024; // 1MB + MemoryLimitController memoryLimitController = new MemoryLimitController(memoryLimit); + + InMemoryMetricReader metricReader = InMemoryMetricReader.create(); + SdkMeterProvider meterProvider = SdkMeterProvider.builder() + .registerMetricReader(metricReader) + .build(); + OpenTelemetry openTelemetry = OpenTelemetrySdk.builder() + .setMeterProvider(meterProvider) + .build(); + + InstrumentProvider instrumentProvider = new InstrumentProvider(openTelemetry); + + try (MemoryBufferStats memoryBufferStats = new MemoryBufferStats(instrumentProvider, memoryLimitController)) { + assertNotNull(memoryBufferStats); + + // Test initial state - no memory used + Collection<MetricData> metrics = metricReader.collectAllMetrics(); + assertUsageMetric(metrics, 0); + assertLimitMetric(metrics, memoryLimit); + + // Reserve some memory + long reservedMemory = 512 * 1024; // 512KB + memoryLimitController.forceReserveMemory(reservedMemory); + + // Collect metrics and verify + metrics = metricReader.collectAllMetrics(); + assertUsageMetric(metrics, reservedMemory); + assertLimitMetric(metrics, memoryLimit); + + // Reserve more memory + long additionalMemory = 256 * 1024; // 256KB + memoryLimitController.forceReserveMemory(additionalMemory); + + // Verify total usage + metrics = metricReader.collectAllMetrics(); + assertUsageMetric(metrics, reservedMemory + additionalMemory); + assertLimitMetric(metrics, memoryLimit); + + // Release some memory + memoryLimitController.releaseMemory(additionalMemory); + + // Verify usage decreased + metrics = metricReader.collectAllMetrics(); + assertUsageMetric(metrics, reservedMemory); + assertLimitMetric(metrics, memoryLimit); + + // Release all memory + memoryLimitController.releaseMemory(reservedMemory); + + // Verify back to zero + metrics = metricReader.collectAllMetrics(); + assertUsageMetric(metrics, 0); + assertLimitMetric(metrics, memoryLimit); + } + } + + @Test + public void testMemoryBufferStatsWithNoMemoryLimit() { + MemoryLimitController memoryLimitController = new MemoryLimitController(0); // No limit + + InMemoryMetricReader metricReader = InMemoryMetricReader.create(); + SdkMeterProvider meterProvider = SdkMeterProvider.builder() + .registerMetricReader(metricReader) + .build(); + OpenTelemetry openTelemetry = OpenTelemetrySdk.builder() + .setMeterProvider(meterProvider) + .build(); + + InstrumentProvider instrumentProvider = new InstrumentProvider(openTelemetry); + + // When memory limiting is disabled, MemoryBufferStats should not be created at all + // This test verifies that the callback correctly checks for memory limiting + try (MemoryBufferStats memoryBufferStats = new MemoryBufferStats(instrumentProvider, memoryLimitController)) { + assertNotNull(memoryBufferStats); + + // When memory limiting is disabled, no metrics should be recorded + Collection<MetricData> metrics = metricReader.collectAllMetrics(); + assertTrue(metrics.isEmpty() || metrics.stream().noneMatch(metric -> + metric.getName().equals(MemoryBufferStats.BUFFER_USAGE_COUNTER) + || metric.getName().equals(MemoryBufferStats.BUFFER_LIMIT_COUNTER))); + } + } + + private void assertUsageMetric(Collection<MetricData> metrics, long expectedValue) { + MetricData usageMetric = metrics.stream() + .filter(metric -> metric.getName().equals(MemoryBufferStats.BUFFER_USAGE_COUNTER)) + .findFirst() + .orElse(null); + + assertNotNull(usageMetric, "Usage metric should be present"); + + Collection<LongPointData> points = usageMetric.getLongSumData().getPoints(); + assertEquals(points.size(), 1, "Should have exactly one data point"); + + LongPointData point = points.iterator().next(); + assertEquals(point.getValue(), expectedValue, "Usage metric value should match expected"); + } + + private void assertLimitMetric(Collection<MetricData> metrics, long expectedValue) { + MetricData limitMetric = metrics.stream() + .filter(metric -> metric.getName().equals(MemoryBufferStats.BUFFER_LIMIT_COUNTER)) + .findFirst() + .orElse(null); + + assertNotNull(limitMetric, "Limit metric should be present"); + + Collection<LongPointData> points = limitMetric.getLongSumData().getPoints(); + assertEquals(points.size(), 1, "Should have exactly one data point"); + + LongPointData point = points.iterator().next(); + assertEquals(point.getValue(), expectedValue, "Limit metric value should match expected"); + } +} \ No newline at end of file