This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 91121d3a09b [feat][monitor] Add ML write latency histogram and entry 
size histogram as OTel metrics (#24815)
91121d3a09b is described below

commit 91121d3a09b5966c31f324e21a20d9b654b515ae
Author: Yuri Mizushima <[email protected]>
AuthorDate: Thu Oct 9 00:57:13 2025 +0900

    [feat][monitor] Add ML write latency histogram and entry size histogram as 
OTel metrics (#24815)
---
 .../mledger/ManagedLedgerAttributes.java           |  4 +
 .../mledger/impl/ManagedLedgerFactoryImpl.java     |  1 +
 .../mledger/impl/ManagedLedgerMBeanImpl.java       |  8 ++
 .../impl/OpenTelemetryManagedLedgerStats.java      | 87 ++++++++++++++++++++++
 .../broker/stats/ManagedLedgerMetricsTest.java     | 14 ++++
 5 files changed, 114 insertions(+)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerAttributes.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerAttributes.java
index c3759a533a5..b1d777dbbf2 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerAttributes.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerAttributes.java
@@ -28,6 +28,7 @@ import 
org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.ManagedLedgerOper
 public class ManagedLedgerAttributes {
 
     private final Attributes attributes;
+    private final Attributes attributesOnlyNamespace;
     private final Attributes attributesOperationSucceed;
     private final Attributes attributesOperationFailure;
 
@@ -37,6 +38,9 @@ public class ManagedLedgerAttributes {
                 OpenTelemetryAttributes.ML_NAME, mlName,
                 OpenTelemetryAttributes.PULSAR_NAMESPACE, getNamespace(mlName)
         );
+        attributesOnlyNamespace = Attributes.of(
+                OpenTelemetryAttributes.PULSAR_NAMESPACE, getNamespace(mlName)
+        );
         attributesOperationSucceed = Attributes.builder()
                 .putAll(attributes)
                 .putAll(ManagedLedgerOperationStatus.SUCCESS.attributes)
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 8a4f0fa3a3c..a452c6682a5 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -138,6 +138,7 @@ public class ManagedLedgerFactoryImpl implements 
ManagedLedgerFactory {
     private final MetadataStore metadataStore;
 
     private final OpenTelemetryManagedLedgerCacheStats openTelemetryCacheStats;
+    @Getter
     private final OpenTelemetryManagedLedgerStats 
openTelemetryManagedLedgerStats;
     private final OpenTelemetryManagedCursorStats 
openTelemetryManagedCursorStats;
 
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java
index 634afccf6ac..92e3052588a 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java
@@ -87,6 +87,8 @@ public class ManagedLedgerMBeanImpl implements 
ManagedLedgerMXBean {
     public void addAddEntrySample(long size) {
         addEntryOps.recordEvent(size);
         entryStats.addValue(size);
+        managedLedger.getFactory().getOpenTelemetryManagedLedgerStats()
+                .recordEntrySize(size, managedLedger);
         addEntryWithReplicasOps.recordEvent(size * 
managedLedger.getConfig().getWriteQuorumSize());
     }
 
@@ -108,14 +110,20 @@ public class ManagedLedgerMBeanImpl implements 
ManagedLedgerMXBean {
 
     public void addAddEntryLatencySample(long latency, TimeUnit unit) {
         addEntryLatencyStatsUsec.addValue(unit.toMicros(latency));
+        managedLedger.getFactory().getOpenTelemetryManagedLedgerStats()
+                .recordAddEntryLatency(latency, unit, managedLedger);
     }
 
     public void addLedgerAddEntryLatencySample(long latency, TimeUnit unit) {
         ledgerAddEntryLatencyStatsUsec.addValue(unit.toMicros(latency));
+        managedLedger.getFactory().getOpenTelemetryManagedLedgerStats()
+                .recordLedgerAddEntryLatency(latency, unit, managedLedger);
     }
 
     public void addLedgerSwitchLatencySample(long latency, TimeUnit unit) {
         ledgerSwitchLatencyStatsUsec.addValue(unit.toMicros(latency));
+        managedLedger.getFactory().getOpenTelemetryManagedLedgerStats()
+                .recordLedgerSwitchLatency(latency, unit, managedLedger);
     }
 
     public void addReadEntriesSample(int count, long totalSize) {
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpenTelemetryManagedLedgerStats.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpenTelemetryManagedLedgerStats.java
index 26c4b62cf76..6e86532bdd8 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpenTelemetryManagedLedgerStats.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpenTelemetryManagedLedgerStats.java
@@ -20,12 +20,18 @@ package org.apache.bookkeeper.mledger.impl;
 
 import io.opentelemetry.api.OpenTelemetry;
 import io.opentelemetry.api.metrics.BatchCallback;
+import io.opentelemetry.api.metrics.DoubleHistogram;
+import io.opentelemetry.api.metrics.LongHistogram;
 import io.opentelemetry.api.metrics.ObservableLongMeasurement;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.pulsar.opentelemetry.Constants;
 
 public class OpenTelemetryManagedLedgerStats implements AutoCloseable {
 
+    // ml-level metrics
+
     // Replaces pulsar_ml_AddEntryMessagesRate
     public static final String ADD_ENTRY_COUNTER = 
"pulsar.broker.managed_ledger.message.outgoing.count";
     private final ObservableLongMeasurement addEntryCounter;
@@ -62,6 +68,34 @@ public class OpenTelemetryManagedLedgerStats implements 
AutoCloseable {
 
     private final BatchCallback batchCallback;
 
+    // namespace-level metrics
+
+    // Histograms support only synchronous mode, so record measurements 
directly.
+    // Synchronous histograms currently do not support delete operations.
+    // Therefore, use only namespace-level attributes to avoid leaking 
high-cardinality attributes (e.g. topic name).
+    // See: https://github.com/apache/pulsar/blob/master/pip/pip-264.md
+
+    // Replaces ['pulsar_ml_AddEntryLatencyBuckets', 
'pulsar_ml_AddEntryLatencyBuckets_OVERFLOW',
+    //           'pulsar_storage_write_latency_*']
+    public static final String ADD_ENTRY_LATENCY_HISTOGRAM = 
"pulsar.broker.managed_ledger.message.outgoing.latency";
+    private final DoubleHistogram addEntryLatencyHistogram;
+
+    // Replaces ['pulsar_ml_LedgerAddEntryLatencyBuckets', 
'pulsar_ml_LedgerAddEntryLatencyBuckets_OVERFLOW',
+    //           'pulsar_storage_ledger_write_latency_*']
+    public static final String LEDGER_ADD_ENTRY_LATENCY_HISTOGRAM =
+            "pulsar.broker.managed_ledger.message.outgoing.ledger.latency";
+    private final DoubleHistogram ledgerAddEntryLatencyHistogram;
+
+    // Replaces ['pulsar_ml_LedgerSwitchLatencyBuckets', 
'pulsar_ml_LedgerSwitchLatencyBuckets_OVERFLOW']
+    public static final String LEDGER_SWITCH_LATENCY_HISTOGRAM =
+            "pulsar.broker.managed_ledger.ledger.switch.latency";
+    private final DoubleHistogram ledgerSwitchLatencyHistogram;
+
+    // Replaces ['pulsar_ml_EntrySizeBuckets', 
'pulsar_ml_EntrySizeBuckets_OVERFLOW',
+    //           'pulsar_entry_size_*']
+    public static final String ENTRY_SIZE_HISTOGRAM = 
"pulsar.broker.managed_ledger.entry.size";
+    private final LongHistogram entrySizeHistogram;
+
     public OpenTelemetryManagedLedgerStats(OpenTelemetry openTelemetry, 
ManagedLedgerFactoryImpl factory) {
         var meter = 
openTelemetry.getMeter(Constants.BROKER_INSTRUMENTATION_SCOPE_NAME);
 
@@ -124,6 +158,39 @@ public class OpenTelemetryManagedLedgerStats implements 
AutoCloseable {
                 bytesInCounter,
                 readEntryCacheMissCounter,
                 markDeleteCounter);
+
+        addEntryLatencyHistogram = meter
+                .histogramBuilder(ADD_ENTRY_LATENCY_HISTOGRAM)
+                .setDescription("End-to-end write latency, including time 
spent in the executor queue.")
+                .setUnit("s")
+                .setExplicitBucketBoundariesAdvice(Arrays.asList(0.001, 0.005, 
0.01, 0.02, 0.05, 0.1,
+                        0.2, 0.5, 1.0, 5.0, 30.0))
+                .build();
+
+        ledgerAddEntryLatencyHistogram = meter
+                .histogramBuilder(LEDGER_ADD_ENTRY_LATENCY_HISTOGRAM)
+                .setDescription("End-to end write latency.")
+                .setUnit("s")
+                .setExplicitBucketBoundariesAdvice(Arrays.asList(0.001, 0.005, 
0.01, 0.02, 0.05, 0.1,
+                        0.2, 0.5, 1.0, 5.0, 30.0))
+                .build();
+
+        ledgerSwitchLatencyHistogram = meter
+                .histogramBuilder(LEDGER_SWITCH_LATENCY_HISTOGRAM)
+                .setDescription("Time taken to switch to a new ledger.")
+                .setUnit("s")
+                .setExplicitBucketBoundariesAdvice(Arrays.asList(0.001, 0.005, 
0.01, 0.02, 0.05, 0.1,
+                        0.2, 0.5, 1.0, 5.0, 30.0))
+                .build();
+
+        entrySizeHistogram = meter
+                .histogramBuilder(ENTRY_SIZE_HISTOGRAM)
+                .ofLongs()
+                .setDescription("Size of entries written to the ledger.")
+                .setUnit("By")
+                .setExplicitBucketBoundariesAdvice(Arrays.asList(128L, 512L, 
1024L, 2048L, 4096L, 16_384L,
+                        102_400L, 1_048_576L))
+                .build();
     }
 
     @Override
@@ -151,4 +218,24 @@ public class OpenTelemetryManagedLedgerStats implements 
AutoCloseable {
         markDeleteCounter.record(stats.getMarkDeleteTotal(), attributes);
         
readEntryCacheMissCounter.record(stats.getReadEntriesOpsCacheMissesTotal(), 
attributes);
     }
+
+    void recordAddEntryLatency(long latency, TimeUnit unit, ManagedLedger ml) {
+        final var attributes = 
ml.getManagedLedgerAttributes().getAttributesOnlyNamespace();
+        this.addEntryLatencyHistogram.record(unit.toMillis(latency) / 1000.0, 
attributes);
+    }
+
+    void recordLedgerAddEntryLatency(long latency, TimeUnit unit, 
ManagedLedger ml) {
+        final var attributes = 
ml.getManagedLedgerAttributes().getAttributesOnlyNamespace();
+        this.ledgerAddEntryLatencyHistogram.record(unit.toMillis(latency) / 
1000.0, attributes);
+    }
+
+    void recordLedgerSwitchLatency(long latency, TimeUnit unit, ManagedLedger 
ml) {
+        final var attributes = 
ml.getManagedLedgerAttributes().getAttributesOnlyNamespace();
+        this.ledgerSwitchLatencyHistogram.record(unit.toMillis(latency) / 
1000.0, attributes);
+    }
+
+    void recordEntrySize(long entrySize, ManagedLedger ml) {
+        final var attributes = 
ml.getManagedLedgerAttributes().getAttributesOnlyNamespace();
+        this.entrySizeHistogram.record(entrySize, attributes);
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java
index 87e751fcc59..394af363d99 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.stats;
 
+import static 
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricHistogramValue;
 import static 
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
 import static 
org.apache.pulsar.transaction.coordinator.impl.DisabledTxnLogBufferedWriterMetricsStats.DISABLED_BUFFERED_WRITER_METRICS;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -135,6 +136,9 @@ public class ManagedLedgerMetricsTest extends 
BrokerTestBase {
                 OpenTelemetryAttributes.ML_NAME, mlName,
                 OpenTelemetryAttributes.PULSAR_NAMESPACE, 
topicNameObj.getNamespace()
         );
+        final var attribOnlyNamespace = Attributes.of(
+                OpenTelemetryAttributes.PULSAR_NAMESPACE, 
topicNameObj.getNamespace()
+        );
         var metricReader = pulsarTestContext.getOpenTelemetryMetricReader();
 
         Awaitility.await().untilAsserted(() -> {
@@ -189,6 +193,16 @@ public class ManagedLedgerMetricsTest extends 
BrokerTestBase {
                     value -> assertThat(value).isGreaterThanOrEqualTo(0));
             assertMetricLongSumValue(otelMetrics, 
OpenTelemetryManagedLedgerStats.READ_ENTRY_CACHE_MISS_COUNTER,
                     attribCommon, value -> 
assertThat(value).isGreaterThanOrEqualTo(0));
+
+            assertMetricHistogramValue(otelMetrics, 
OpenTelemetryManagedLedgerStats.ADD_ENTRY_LATENCY_HISTOGRAM,
+                    attribOnlyNamespace, count -> 
assertThat(count).isEqualTo(15L),
+                    sum -> assertThat(sum).isGreaterThan(0.0));
+            assertMetricHistogramValue(otelMetrics, 
OpenTelemetryManagedLedgerStats.LEDGER_ADD_ENTRY_LATENCY_HISTOGRAM,
+                    attribOnlyNamespace, count -> 
assertThat(count).isEqualTo(15L),
+                    sum -> assertThat(sum).isGreaterThan(0.0));
+            assertMetricHistogramValue(otelMetrics, 
OpenTelemetryManagedLedgerStats.ENTRY_SIZE_HISTOGRAM,
+                    attribOnlyNamespace, count -> 
assertThat(count).isEqualTo(15L),
+                    sum -> assertThat(sum).isGreaterThan(0.0));
         });
     }
 

Reply via email to