This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 91121d3a09b [feat][monitor] Add ML write latency histogram and entry
size histogram as OTel metrics (#24815)
91121d3a09b is described below
commit 91121d3a09b5966c31f324e21a20d9b654b515ae
Author: Yuri Mizushima <[email protected]>
AuthorDate: Thu Oct 9 00:57:13 2025 +0900
[feat][monitor] Add ML write latency histogram and entry size histogram as
OTel metrics (#24815)
---
.../mledger/ManagedLedgerAttributes.java | 4 +
.../mledger/impl/ManagedLedgerFactoryImpl.java | 1 +
.../mledger/impl/ManagedLedgerMBeanImpl.java | 8 ++
.../impl/OpenTelemetryManagedLedgerStats.java | 87 ++++++++++++++++++++++
.../broker/stats/ManagedLedgerMetricsTest.java | 14 ++++
5 files changed, 114 insertions(+)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerAttributes.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerAttributes.java
index c3759a533a5..b1d777dbbf2 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerAttributes.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerAttributes.java
@@ -28,6 +28,7 @@ import
org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.ManagedLedgerOper
public class ManagedLedgerAttributes {
private final Attributes attributes;
+ private final Attributes attributesOnlyNamespace;
private final Attributes attributesOperationSucceed;
private final Attributes attributesOperationFailure;
@@ -37,6 +38,9 @@ public class ManagedLedgerAttributes {
OpenTelemetryAttributes.ML_NAME, mlName,
OpenTelemetryAttributes.PULSAR_NAMESPACE, getNamespace(mlName)
);
+ attributesOnlyNamespace = Attributes.of(
+ OpenTelemetryAttributes.PULSAR_NAMESPACE, getNamespace(mlName)
+ );
attributesOperationSucceed = Attributes.builder()
.putAll(attributes)
.putAll(ManagedLedgerOperationStatus.SUCCESS.attributes)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 8a4f0fa3a3c..a452c6682a5 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -138,6 +138,7 @@ public class ManagedLedgerFactoryImpl implements
ManagedLedgerFactory {
private final MetadataStore metadataStore;
private final OpenTelemetryManagedLedgerCacheStats openTelemetryCacheStats;
+ @Getter
private final OpenTelemetryManagedLedgerStats
openTelemetryManagedLedgerStats;
private final OpenTelemetryManagedCursorStats
openTelemetryManagedCursorStats;
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java
index 634afccf6ac..92e3052588a 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java
@@ -87,6 +87,8 @@ public class ManagedLedgerMBeanImpl implements
ManagedLedgerMXBean {
public void addAddEntrySample(long size) {
addEntryOps.recordEvent(size);
entryStats.addValue(size);
+ managedLedger.getFactory().getOpenTelemetryManagedLedgerStats()
+ .recordEntrySize(size, managedLedger);
addEntryWithReplicasOps.recordEvent(size *
managedLedger.getConfig().getWriteQuorumSize());
}
@@ -108,14 +110,20 @@ public class ManagedLedgerMBeanImpl implements
ManagedLedgerMXBean {
public void addAddEntryLatencySample(long latency, TimeUnit unit) {
addEntryLatencyStatsUsec.addValue(unit.toMicros(latency));
+ managedLedger.getFactory().getOpenTelemetryManagedLedgerStats()
+ .recordAddEntryLatency(latency, unit, managedLedger);
}
public void addLedgerAddEntryLatencySample(long latency, TimeUnit unit) {
ledgerAddEntryLatencyStatsUsec.addValue(unit.toMicros(latency));
+ managedLedger.getFactory().getOpenTelemetryManagedLedgerStats()
+ .recordLedgerAddEntryLatency(latency, unit, managedLedger);
}
public void addLedgerSwitchLatencySample(long latency, TimeUnit unit) {
ledgerSwitchLatencyStatsUsec.addValue(unit.toMicros(latency));
+ managedLedger.getFactory().getOpenTelemetryManagedLedgerStats()
+ .recordLedgerSwitchLatency(latency, unit, managedLedger);
}
public void addReadEntriesSample(int count, long totalSize) {
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpenTelemetryManagedLedgerStats.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpenTelemetryManagedLedgerStats.java
index 26c4b62cf76..6e86532bdd8 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpenTelemetryManagedLedgerStats.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpenTelemetryManagedLedgerStats.java
@@ -20,12 +20,18 @@ package org.apache.bookkeeper.mledger.impl;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.metrics.BatchCallback;
+import io.opentelemetry.api.metrics.DoubleHistogram;
+import io.opentelemetry.api.metrics.LongHistogram;
import io.opentelemetry.api.metrics.ObservableLongMeasurement;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.pulsar.opentelemetry.Constants;
public class OpenTelemetryManagedLedgerStats implements AutoCloseable {
+ // ml-level metrics
+
// Replaces pulsar_ml_AddEntryMessagesRate
public static final String ADD_ENTRY_COUNTER =
"pulsar.broker.managed_ledger.message.outgoing.count";
private final ObservableLongMeasurement addEntryCounter;
@@ -62,6 +68,34 @@ public class OpenTelemetryManagedLedgerStats implements
AutoCloseable {
private final BatchCallback batchCallback;
+ // namespace-level metrics
+
+ // Histograms support only synchronous mode, so record measurements
directly.
+ // Synchronous histograms currently do not support delete operations.
+ // Therefore, use only namespace-level attributes to avoid leaking
high-cardinality attributes (e.g. topic name).
+ // See: https://github.com/apache/pulsar/blob/master/pip/pip-264.md
+
+ // Replaces ['pulsar_ml_AddEntryLatencyBuckets',
'pulsar_ml_AddEntryLatencyBuckets_OVERFLOW',
+ // 'pulsar_storage_write_latency_*']
+ public static final String ADD_ENTRY_LATENCY_HISTOGRAM =
"pulsar.broker.managed_ledger.message.outgoing.latency";
+ private final DoubleHistogram addEntryLatencyHistogram;
+
+ // Replaces ['pulsar_ml_LedgerAddEntryLatencyBuckets',
'pulsar_ml_LedgerAddEntryLatencyBuckets_OVERFLOW',
+ // 'pulsar_storage_ledger_write_latency_*']
+ public static final String LEDGER_ADD_ENTRY_LATENCY_HISTOGRAM =
+ "pulsar.broker.managed_ledger.message.outgoing.ledger.latency";
+ private final DoubleHistogram ledgerAddEntryLatencyHistogram;
+
+ // Replaces ['pulsar_ml_LedgerSwitchLatencyBuckets',
'pulsar_ml_LedgerSwitchLatencyBuckets_OVERFLOW']
+ public static final String LEDGER_SWITCH_LATENCY_HISTOGRAM =
+ "pulsar.broker.managed_ledger.ledger.switch.latency";
+ private final DoubleHistogram ledgerSwitchLatencyHistogram;
+
+ // Replaces ['pulsar_ml_EntrySizeBuckets',
'pulsar_ml_EntrySizeBuckets_OVERFLOW',
+ // 'pulsar_entry_size_*']
+ public static final String ENTRY_SIZE_HISTOGRAM =
"pulsar.broker.managed_ledger.entry.size";
+ private final LongHistogram entrySizeHistogram;
+
public OpenTelemetryManagedLedgerStats(OpenTelemetry openTelemetry,
ManagedLedgerFactoryImpl factory) {
var meter =
openTelemetry.getMeter(Constants.BROKER_INSTRUMENTATION_SCOPE_NAME);
@@ -124,6 +158,39 @@ public class OpenTelemetryManagedLedgerStats implements
AutoCloseable {
bytesInCounter,
readEntryCacheMissCounter,
markDeleteCounter);
+
+ addEntryLatencyHistogram = meter
+ .histogramBuilder(ADD_ENTRY_LATENCY_HISTOGRAM)
+ .setDescription("End-to-end write latency, including time
spent in the executor queue.")
+ .setUnit("s")
+ .setExplicitBucketBoundariesAdvice(Arrays.asList(0.001, 0.005,
0.01, 0.02, 0.05, 0.1,
+ 0.2, 0.5, 1.0, 5.0, 30.0))
+ .build();
+
+ ledgerAddEntryLatencyHistogram = meter
+ .histogramBuilder(LEDGER_ADD_ENTRY_LATENCY_HISTOGRAM)
+ .setDescription("End-to end write latency.")
+ .setUnit("s")
+ .setExplicitBucketBoundariesAdvice(Arrays.asList(0.001, 0.005,
0.01, 0.02, 0.05, 0.1,
+ 0.2, 0.5, 1.0, 5.0, 30.0))
+ .build();
+
+ ledgerSwitchLatencyHistogram = meter
+ .histogramBuilder(LEDGER_SWITCH_LATENCY_HISTOGRAM)
+ .setDescription("Time taken to switch to a new ledger.")
+ .setUnit("s")
+ .setExplicitBucketBoundariesAdvice(Arrays.asList(0.001, 0.005,
0.01, 0.02, 0.05, 0.1,
+ 0.2, 0.5, 1.0, 5.0, 30.0))
+ .build();
+
+ entrySizeHistogram = meter
+ .histogramBuilder(ENTRY_SIZE_HISTOGRAM)
+ .ofLongs()
+ .setDescription("Size of entries written to the ledger.")
+ .setUnit("By")
+ .setExplicitBucketBoundariesAdvice(Arrays.asList(128L, 512L,
1024L, 2048L, 4096L, 16_384L,
+ 102_400L, 1_048_576L))
+ .build();
}
@Override
@@ -151,4 +218,24 @@ public class OpenTelemetryManagedLedgerStats implements
AutoCloseable {
markDeleteCounter.record(stats.getMarkDeleteTotal(), attributes);
readEntryCacheMissCounter.record(stats.getReadEntriesOpsCacheMissesTotal(),
attributes);
}
+
+ void recordAddEntryLatency(long latency, TimeUnit unit, ManagedLedger ml) {
+ final var attributes =
ml.getManagedLedgerAttributes().getAttributesOnlyNamespace();
+ this.addEntryLatencyHistogram.record(unit.toMillis(latency) / 1000.0,
attributes);
+ }
+
+ void recordLedgerAddEntryLatency(long latency, TimeUnit unit,
ManagedLedger ml) {
+ final var attributes =
ml.getManagedLedgerAttributes().getAttributesOnlyNamespace();
+ this.ledgerAddEntryLatencyHistogram.record(unit.toMillis(latency) /
1000.0, attributes);
+ }
+
+ void recordLedgerSwitchLatency(long latency, TimeUnit unit, ManagedLedger
ml) {
+ final var attributes =
ml.getManagedLedgerAttributes().getAttributesOnlyNamespace();
+ this.ledgerSwitchLatencyHistogram.record(unit.toMillis(latency) /
1000.0, attributes);
+ }
+
+ void recordEntrySize(long entrySize, ManagedLedger ml) {
+ final var attributes =
ml.getManagedLedgerAttributes().getAttributesOnlyNamespace();
+ this.entrySizeHistogram.record(entrySize, attributes);
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java
index 87e751fcc59..394af363d99 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.stats;
+import static
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricHistogramValue;
import static
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
import static
org.apache.pulsar.transaction.coordinator.impl.DisabledTxnLogBufferedWriterMetricsStats.DISABLED_BUFFERED_WRITER_METRICS;
import static org.assertj.core.api.Assertions.assertThat;
@@ -135,6 +136,9 @@ public class ManagedLedgerMetricsTest extends
BrokerTestBase {
OpenTelemetryAttributes.ML_NAME, mlName,
OpenTelemetryAttributes.PULSAR_NAMESPACE,
topicNameObj.getNamespace()
);
+ final var attribOnlyNamespace = Attributes.of(
+ OpenTelemetryAttributes.PULSAR_NAMESPACE,
topicNameObj.getNamespace()
+ );
var metricReader = pulsarTestContext.getOpenTelemetryMetricReader();
Awaitility.await().untilAsserted(() -> {
@@ -189,6 +193,16 @@ public class ManagedLedgerMetricsTest extends
BrokerTestBase {
value -> assertThat(value).isGreaterThanOrEqualTo(0));
assertMetricLongSumValue(otelMetrics,
OpenTelemetryManagedLedgerStats.READ_ENTRY_CACHE_MISS_COUNTER,
attribCommon, value ->
assertThat(value).isGreaterThanOrEqualTo(0));
+
+ assertMetricHistogramValue(otelMetrics,
OpenTelemetryManagedLedgerStats.ADD_ENTRY_LATENCY_HISTOGRAM,
+ attribOnlyNamespace, count ->
assertThat(count).isEqualTo(15L),
+ sum -> assertThat(sum).isGreaterThan(0.0));
+ assertMetricHistogramValue(otelMetrics,
OpenTelemetryManagedLedgerStats.LEDGER_ADD_ENTRY_LATENCY_HISTOGRAM,
+ attribOnlyNamespace, count ->
assertThat(count).isEqualTo(15L),
+ sum -> assertThat(sum).isGreaterThan(0.0));
+ assertMetricHistogramValue(otelMetrics,
OpenTelemetryManagedLedgerStats.ENTRY_SIZE_HISTOGRAM,
+ attribOnlyNamespace, count ->
assertThat(count).isEqualTo(15L),
+ sum -> assertThat(sum).isGreaterThan(0.0));
});
}