This is an automated email from the ASF dual-hosted git repository.
heesung pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f4d1d05ee38 [feat][broker] PIP-264: Add OpenTelemetry managed ledger
metrics (#22987)
f4d1d05ee38 is described below
commit f4d1d05ee385bd730cbb4fa09a287614a00400a3
Author: Dragos Misca <[email protected]>
AuthorDate: Wed Jul 3 12:25:39 2024 -0700
[feat][broker] PIP-264: Add OpenTelemetry managed ledger metrics (#22987)
---
.../apache/bookkeeper/mledger/ManagedLedger.java | 7 +
.../mledger/ManagedLedgerAttributes.java | 57 ++++++++
.../bookkeeper/mledger/ManagedLedgerMXBean.java | 35 +++++
.../mledger/impl/ManagedLedgerFactoryImpl.java | 3 +
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 6 +
.../mledger/impl/ManagedLedgerMBeanImpl.java | 35 +++++
.../impl/OpenTelemetryManagedLedgerStats.java | 153 +++++++++++++++++++++
.../broker/stats/ManagedLedgerMetricsTest.java | 100 +++++++++++++-
.../opentelemetry/OpenTelemetryAttributes.java | 17 +++
9 files changed, 406 insertions(+), 7 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index 955a0d78502..a9242d5cc65 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -696,4 +696,11 @@ public interface ManagedLedger {
* Check if managed ledger should cache backlog reads.
*/
void checkCursorsToCacheEntries();
+
+ /**
+ * Get managed ledger attributes.
+ */
+ default ManagedLedgerAttributes getManagedLedgerAttributes() {
+ return new ManagedLedgerAttributes(this);
+ }
}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerAttributes.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerAttributes.java
new file mode 100644
index 00000000000..c3759a533a5
--- /dev/null
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerAttributes.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger;
+
+import io.opentelemetry.api.common.Attributes;
+import lombok.Getter;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
+import
org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.ManagedLedgerOperationStatus;
+
+@Getter
+public class ManagedLedgerAttributes {
+
+ private final Attributes attributes;
+ private final Attributes attributesOperationSucceed;
+ private final Attributes attributesOperationFailure;
+
+ public ManagedLedgerAttributes(ManagedLedger ml) {
+ var mlName = ml.getName();
+ attributes = Attributes.of(
+ OpenTelemetryAttributes.ML_NAME, mlName,
+ OpenTelemetryAttributes.PULSAR_NAMESPACE, getNamespace(mlName)
+ );
+ attributesOperationSucceed = Attributes.builder()
+ .putAll(attributes)
+ .putAll(ManagedLedgerOperationStatus.SUCCESS.attributes)
+ .build();
+ attributesOperationFailure = Attributes.builder()
+ .putAll(attributes)
+ .putAll(ManagedLedgerOperationStatus.FAILURE.attributes)
+ .build();
+ }
+
+ private static String getNamespace(String mlName) {
+ try {
+ return
TopicName.get(TopicName.fromPersistenceNamingEncoding(mlName)).getNamespace();
+ } catch (RuntimeException e) {
+ return null;
+ }
+ }
+}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java
index 44345c430b7..1d978e23785 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java
@@ -60,11 +60,21 @@ public interface ManagedLedgerMXBean {
*/
double getAddEntryBytesRate();
+ /**
+ * @return the total number of bytes written
+ */
+ long getAddEntryBytesTotal();
+
/**
* @return the bytes/s rate of messages added with replicas
*/
double getAddEntryWithReplicasBytesRate();
+ /**
+ * @return the total number of bytes written, including replicas
+ */
+ long getAddEntryWithReplicasBytesTotal();
+
/**
* @return the msg/s rate of messages read
*/
@@ -75,11 +85,21 @@ public interface ManagedLedgerMXBean {
*/
double getReadEntriesBytesRate();
+ /**
+ * @return the total number of bytes read
+ */
+ long getReadEntriesBytesTotal();
+
/**
* @return the rate of mark-delete ops/s
*/
double getMarkDeleteRate();
+ /**
+ * @return the number of mark-delete ops
+ */
+ long getMarkDeleteTotal();
+
/**
* @return the number of addEntry requests that succeeded
*/
@@ -95,6 +115,11 @@ public interface ManagedLedgerMXBean {
*/
long getAddEntryErrors();
+ /**
+ * @return the total number of addEntry requests that failed
+ */
+ long getAddEntryErrorsTotal();
+
/**
* @return the number of entries read from the managed ledger (from cache
or BK)
*/
@@ -115,11 +140,21 @@ public interface ManagedLedgerMXBean {
*/
long getReadEntriesErrors();
+ /**
+ * @return the total number of readEntries requests that failed
+ */
+ long getReadEntriesErrorsTotal();
+
/**
* @return the number of readEntries requests that cache miss Rate
*/
double getReadEntriesOpsCacheMissesRate();
+ /**
+ * @return the total number of readEntries requests that cache miss
+ */
+ long getReadEntriesOpsCacheMissesTotal();
+
// Entry size statistics
double getEntrySizeAverage();
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index fc291b801c8..b1939f40e93 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -121,6 +121,7 @@ public class ManagedLedgerFactoryImpl implements
ManagedLedgerFactory {
private final MetadataStore metadataStore;
private final OpenTelemetryManagedLedgerCacheStats openTelemetryCacheStats;
+ private final OpenTelemetryManagedLedgerStats
openTelemetryManagedLedgerStats;
//indicate whether shutdown() is called.
private volatile boolean closed;
@@ -229,6 +230,7 @@ public class ManagedLedgerFactoryImpl implements
ManagedLedgerFactory {
metadataStore.registerSessionListener(this::handleMetadataStoreNotification);
openTelemetryCacheStats = new
OpenTelemetryManagedLedgerCacheStats(openTelemetry, this);
+ openTelemetryManagedLedgerStats = new
OpenTelemetryManagedLedgerStats(openTelemetry, this);
}
static class DefaultBkFactory implements
BookkeeperFactoryForCustomEnsemblePlacementPolicy {
@@ -620,6 +622,7 @@ public class ManagedLedgerFactoryImpl implements
ManagedLedgerFactory {
}));
}).thenAcceptAsync(__ -> {
//wait for tasks in scheduledExecutor executed.
+ openTelemetryManagedLedgerStats.close();
openTelemetryCacheStats.close();
scheduledExecutor.shutdownNow();
entryCacheManager.clear();
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 8d1919dd052..b7734906f75 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -96,6 +96,7 @@ import
org.apache.bookkeeper.mledger.AsyncCallbacks.UpdatePropertiesCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerAttributes;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import
org.apache.bookkeeper.mledger.ManagedLedgerException.BadVersionException;
@@ -326,6 +327,9 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
*/
final ConcurrentLinkedQueue<OpAddEntry> pendingAddEntries = new
ConcurrentLinkedQueue<>();
+ @Getter
+ private final ManagedLedgerAttributes managedLedgerAttributes;
+
/**
* This variable is used for testing the tests.
* ManagedLedgerTest#testManagedLedgerWithPlacementPolicyInCustomMetadata()
@@ -338,6 +342,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
final String name) {
this(factory, bookKeeper, store, config, scheduledExecutor, name,
null);
}
+
public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper
bookKeeper, MetaStore store,
ManagedLedgerConfig config, OrderedScheduler scheduledExecutor,
final String name, final Supplier<CompletableFuture<Boolean>>
mlOwnershipChecker) {
@@ -373,6 +378,7 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
this.minBacklogCursorsForCaching =
config.getMinimumBacklogCursorsForCaching();
this.minBacklogEntriesForCaching =
config.getMinimumBacklogEntriesForCaching();
this.maxBacklogBetweenCursorsForCaching =
config.getMaxBacklogBetweenCursorsForCaching();
+ this.managedLedgerAttributes = new ManagedLedgerAttributes(this);
}
synchronized void initialize(final ManagedLedgerInitializeLedgerCallback
callback, final Object ctx) {
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java
index 5e5161a29ca..86320f92924 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java
@@ -210,11 +210,21 @@ public class ManagedLedgerMBeanImpl implements
ManagedLedgerMXBean {
return addEntryOps.getValueRate();
}
+ @Override
+ public long getAddEntryBytesTotal() {
+ return addEntryOps.getTotalValue();
+ }
+
@Override
public double getAddEntryWithReplicasBytesRate() {
return addEntryWithReplicasOps.getValueRate();
}
+ @Override
+ public long getAddEntryWithReplicasBytesTotal() {
+ return addEntryWithReplicasOps.getTotalValue();
+ }
+
@Override
public double getReadEntriesRate() {
return readEntriesOps.getRate();
@@ -225,6 +235,11 @@ public class ManagedLedgerMBeanImpl implements
ManagedLedgerMXBean {
return readEntriesOps.getValueRate();
}
+ @Override
+ public long getReadEntriesBytesTotal() {
+ return readEntriesOps.getTotalValue();
+ }
+
@Override
public long getAddEntrySucceed() {
return addEntryOps.getCount();
@@ -240,6 +255,11 @@ public class ManagedLedgerMBeanImpl implements
ManagedLedgerMXBean {
return addEntryOpsFailed.getCount();
}
+ @Override
+ public long getAddEntryErrorsTotal() {
+ return addEntryOpsFailed.getTotalCount();
+ }
+
@Override
public long getReadEntriesSucceeded() {
return readEntriesOps.getCount();
@@ -255,16 +275,31 @@ public class ManagedLedgerMBeanImpl implements
ManagedLedgerMXBean {
return readEntriesOpsFailed.getCount();
}
+ @Override
+ public long getReadEntriesErrorsTotal() {
+ return readEntriesOpsFailed.getTotalCount();
+ }
+
@Override
public double getReadEntriesOpsCacheMissesRate() {
return readEntriesOpsCacheMisses.getRate();
}
+ @Override
+ public long getReadEntriesOpsCacheMissesTotal() {
+ return readEntriesOpsCacheMisses.getTotalCount();
+ }
+
@Override
public double getMarkDeleteRate() {
return markDeleteOps.getRate();
}
+ @Override
+ public long getMarkDeleteTotal() {
+ return markDeleteOps.getTotalCount();
+ }
+
@Override
public double getEntrySizeAverage() {
return entryStats.getAvg();
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpenTelemetryManagedLedgerStats.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpenTelemetryManagedLedgerStats.java
new file mode 100644
index 00000000000..f7b9d91dff6
--- /dev/null
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpenTelemetryManagedLedgerStats.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.metrics.BatchCallback;
+import io.opentelemetry.api.metrics.ObservableLongMeasurement;
+import org.apache.pulsar.opentelemetry.Constants;
+
+public class OpenTelemetryManagedLedgerStats implements AutoCloseable {
+
+ // Replaces pulsar_ml_AddEntryMessagesRate
+ public static final String ADD_ENTRY_COUNTER =
"pulsar.broker.managed_ledger.message.outgoing.count";
+ private final ObservableLongMeasurement addEntryCounter;
+
+ // Replaces pulsar_ml_AddEntryBytesRate
+ public static final String BYTES_OUT_COUNTER =
"pulsar.broker.managed_ledger.message.outgoing.logical.size";
+ private final ObservableLongMeasurement bytesOutCounter;
+
+ // Replaces pulsar_ml_AddEntryWithReplicasBytesRate
+ public static final String BYTES_OUT_WITH_REPLICAS_COUNTER =
+ "pulsar.broker.managed_ledger.message.outgoing.replicated.size";
+ private final ObservableLongMeasurement bytesOutWithReplicasCounter;
+
+ // Replaces pulsar_ml_NumberOfMessagesInBacklog
+ public static final String BACKLOG_COUNTER =
"pulsar.broker.managed_ledger.backlog.count";
+ private final ObservableLongMeasurement backlogCounter;
+
+ // Replaces pulsar_ml_ReadEntriesRate
+ public static final String READ_ENTRY_COUNTER =
"pulsar.broker.managed_ledger.message.incoming.count";
+ private final ObservableLongMeasurement readEntryCounter;
+
+ // Replaces pulsar_ml_ReadEntriesBytesRate
+ public static final String BYTES_IN_COUNTER =
"pulsar.broker.managed_ledger.message.incoming.size";
+ private final ObservableLongMeasurement bytesInCounter;
+
+ // Replaces brk_ml_ReadEntriesOpsCacheMissesRate
+ public static final String READ_ENTRY_CACHE_MISS_COUNTER =
+ "pulsar.broker.managed_ledger.message.incoming.cache.miss.count";
+ private final ObservableLongMeasurement readEntryCacheMissCounter;
+
+ // Replaces pulsar_ml_MarkDeleteRate
+ public static final String MARK_DELETE_COUNTER =
"pulsar.broker.managed_ledger.mark_delete.count";
+ private final ObservableLongMeasurement markDeleteCounter;
+
+ private final BatchCallback batchCallback;
+
+ public OpenTelemetryManagedLedgerStats(OpenTelemetry openTelemetry,
ManagedLedgerFactoryImpl factory) {
+ var meter =
openTelemetry.getMeter(Constants.BROKER_INSTRUMENTATION_SCOPE_NAME);
+
+ addEntryCounter = meter
+ .upDownCounterBuilder(ADD_ENTRY_COUNTER)
+ .setUnit("{operation}")
+ .setDescription("The number of write operations to this
ledger.")
+ .buildObserver();
+
+ bytesOutCounter = meter
+ .counterBuilder(BYTES_OUT_COUNTER)
+ .setUnit("By")
+ .setDescription("The total number of messages bytes written to
this ledger, excluding replicas.")
+ .buildObserver();
+
+ bytesOutWithReplicasCounter = meter
+ .counterBuilder(BYTES_OUT_WITH_REPLICAS_COUNTER)
+ .setUnit("By")
+ .setDescription("The total number of messages bytes written to
this ledger, including replicas.")
+ .buildObserver();
+
+ backlogCounter = meter
+ .upDownCounterBuilder(BACKLOG_COUNTER)
+ .setUnit("{message}")
+ .setDescription("The number of messages in backlog for all
consumers from this ledger.")
+ .buildObserver();
+
+ readEntryCounter = meter
+ .upDownCounterBuilder(READ_ENTRY_COUNTER)
+ .setUnit("{operation}")
+ .setDescription("The number of read operations from this
ledger.")
+ .buildObserver();
+
+ bytesInCounter = meter
+ .counterBuilder(BYTES_IN_COUNTER)
+ .setUnit("By")
+ .setDescription("The total number of messages bytes read from
this ledger.")
+ .buildObserver();
+
+ readEntryCacheMissCounter = meter
+ .upDownCounterBuilder(READ_ENTRY_CACHE_MISS_COUNTER)
+ .setUnit("{operation}")
+ .setDescription("The number of cache misses during read
operations from this ledger.")
+ .buildObserver();
+
+ markDeleteCounter = meter
+ .counterBuilder(MARK_DELETE_COUNTER)
+ .setUnit("{operation}")
+ .setDescription("The total number of mark delete operations
for this ledger.")
+ .buildObserver();
+
+ batchCallback = meter.batchCallback(() -> factory.getManagedLedgers()
+ .values()
+ .forEach(this::recordMetrics),
+ addEntryCounter,
+ bytesOutCounter,
+ bytesOutWithReplicasCounter,
+ backlogCounter,
+ readEntryCounter,
+ bytesInCounter,
+ readEntryCacheMissCounter,
+ markDeleteCounter);
+ }
+
+ @Override
+ public void close() {
+ batchCallback.close();
+ }
+
+ private void recordMetrics(ManagedLedgerImpl ml) {
+ var stats = ml.getMbean();
+ var ledgerAttributeSet = ml.getManagedLedgerAttributes();
+ var attributes = ledgerAttributeSet.getAttributes();
+ var attributesSucceed =
ledgerAttributeSet.getAttributesOperationSucceed();
+ var attributesFailure =
ledgerAttributeSet.getAttributesOperationFailure();
+
+ addEntryCounter.record(stats.getAddEntrySucceedTotal(),
attributesSucceed);
+ addEntryCounter.record(stats.getAddEntryErrorsTotal(),
attributesFailure);
+ bytesOutCounter.record(stats.getAddEntryBytesTotal(), attributes);
+
bytesOutWithReplicasCounter.record(stats.getAddEntryWithReplicasBytesTotal(),
attributes);
+
+ readEntryCounter.record(stats.getReadEntriesSucceededTotal(),
attributesSucceed);
+ readEntryCounter.record(stats.getReadEntriesErrorsTotal(),
attributesFailure);
+ bytesInCounter.record(stats.getReadEntriesBytesTotal(), attributes);
+
+ backlogCounter.record(stats.getNumberOfMessagesInBacklog(),
attributes);
+ markDeleteCounter.record(stats.getMarkDeleteTotal(), attributes);
+
readEntryCacheMissCounter.record(stats.getReadEntriesOpsCacheMissesTotal(),
attributes);
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java
index bec73121e48..b9c0ab08e4e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java
@@ -18,27 +18,38 @@
*/
package org.apache.pulsar.broker.stats;
+import static
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
import static
org.apache.pulsar.transaction.coordinator.impl.DisabledTxnLogBufferedWriterMetricsStats.DISABLED_BUFFERED_WRITER_METRICS;
+import static org.assertj.core.api.Assertions.assertThat;
+import com.google.common.collect.Sets;
import io.netty.util.HashedWheelTimer;
import io.netty.util.concurrent.DefaultThreadFactory;
+import io.opentelemetry.api.common.Attributes;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
-import com.google.common.collect.Sets;
+import lombok.Cleanup;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl;
+import org.apache.bookkeeper.mledger.impl.OpenTelemetryManagedLedgerStats;
+import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.stats.metrics.ManagedLedgerMetrics;
+import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.stats.Metrics;
+import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
import
org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig;
+import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -68,6 +79,12 @@ public class ManagedLedgerMetricsTest extends BrokerTestBase
{
super.internalCleanup();
}
+ @Override
+ protected void
customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder
pulsarTestContextBuilder) {
+ super.customizeMainPulsarTestContextBuilder(pulsarTestContextBuilder);
+ pulsarTestContextBuilder.enableOpenTelemetry(true);
+ }
+
@Test
public void testManagedLedgerMetrics() throws Exception {
ManagedLedgerMetrics metrics = new ManagedLedgerMetrics(pulsar);
@@ -76,15 +93,20 @@ public class ManagedLedgerMetricsTest extends
BrokerTestBase {
List<Metrics> list1 = metrics.generate();
Assert.assertTrue(list1.isEmpty());
- Producer<byte[]> producer =
pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1")
- .create();
+ var topicName = "persistent://my-property/use/my-ns/my-topic1";
+ @Cleanup
+ Producer<byte[]> producer =
pulsarClient.newProducer().topic(topicName).create();
+
+ @Cleanup
+ var consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName("sub1").subscribe();
+
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
- for (Entry<String, ManagedLedgerImpl> ledger :
((ManagedLedgerFactoryImpl) pulsar.getManagedLedgerFactory())
- .getManagedLedgers().entrySet()) {
+ var managedLedgerFactory = (ManagedLedgerFactoryImpl)
pulsar.getManagedLedgerFactory();
+ for (Entry<String, ManagedLedgerImpl> ledger :
managedLedgerFactory.getManagedLedgers().entrySet()) {
ManagedLedgerMBeanImpl stats = (ManagedLedgerMBeanImpl)
ledger.getValue().getStats();
stats.refreshStats(1, TimeUnit.SECONDS);
}
@@ -96,14 +118,78 @@ public class ManagedLedgerMetricsTest extends
BrokerTestBase {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
- for (Entry<String, ManagedLedgerImpl> ledger :
((ManagedLedgerFactoryImpl) pulsar.getManagedLedgerFactory())
- .getManagedLedgers().entrySet()) {
+ for (Entry<String, ManagedLedgerImpl> ledger :
managedLedgerFactory.getManagedLedgers().entrySet()) {
ManagedLedgerMBeanImpl stats = (ManagedLedgerMBeanImpl)
ledger.getValue().getStats();
stats.refreshStats(1, TimeUnit.SECONDS);
}
List<Metrics> list3 = metrics.generate();
Assert.assertEquals(list3.get(0).getMetrics().get(addEntryRateKey),
5.0D);
+ // Validate OpenTelemetry metrics.
+ var ledgers = managedLedgerFactory.getManagedLedgers();
+ var topicNameObj = TopicName.get(topicName);
+ var mlName = topicNameObj.getPersistenceNamingEncoding();
+ assertThat(ledgers).containsKey(mlName);
+ var ml = ledgers.get(mlName);
+ var attribCommon = Attributes.of(
+ OpenTelemetryAttributes.ML_NAME, mlName,
+ OpenTelemetryAttributes.PULSAR_NAMESPACE,
topicNameObj.getNamespace()
+ );
+ var metricReader = pulsarTestContext.getOpenTelemetryMetricReader();
+
+ Awaitility.await().untilAsserted(() -> {
+ var otelMetrics = metricReader.collectAllMetrics();
+ assertMetricLongSumValue(otelMetrics,
OpenTelemetryManagedLedgerStats.BACKLOG_COUNTER, attribCommon, 15);
+ assertMetricLongSumValue(otelMetrics,
OpenTelemetryManagedLedgerStats.MARK_DELETE_COUNTER, attribCommon, 0);
+ });
+
+ for (int i = 0; i < 10; i++) {
+ var msg = consumer.receive(1, TimeUnit.SECONDS);
+ consumer.acknowledge(msg);
+ }
+
+ Awaitility.await().untilAsserted(() -> {
+ var otelMetrics = metricReader.collectAllMetrics();
+ assertMetricLongSumValue(otelMetrics,
OpenTelemetryManagedLedgerStats.BACKLOG_COUNTER, attribCommon, 5);
+ assertMetricLongSumValue(otelMetrics,
OpenTelemetryManagedLedgerStats.MARK_DELETE_COUNTER, attribCommon,
+ value -> assertThat(value).isPositive());
+ });
+
+ Awaitility.await().untilAsserted(() -> {
+ @Cleanup
+ var cons = pulsarClient.newConsumer()
+ .topic(topicName)
+ .subscriptionName(BrokerTestUtil.newUniqueName("sub"))
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+ cons.receive(1, TimeUnit.SECONDS);
+
+ var attribSucceed = Attributes.of(
+ OpenTelemetryAttributes.ML_NAME, mlName,
+ OpenTelemetryAttributes.PULSAR_NAMESPACE,
topicNameObj.getNamespace(),
+ OpenTelemetryAttributes.ML_OPERATION_STATUS, "success"
+ );
+ var attribFailed = Attributes.of(
+ OpenTelemetryAttributes.ML_NAME, mlName,
+ OpenTelemetryAttributes.PULSAR_NAMESPACE,
topicNameObj.getNamespace(),
+ OpenTelemetryAttributes.ML_OPERATION_STATUS, "failure"
+ );
+ var otelMetrics = metricReader.collectAllMetrics();
+ assertMetricLongSumValue(otelMetrics,
OpenTelemetryManagedLedgerStats.ADD_ENTRY_COUNTER, attribSucceed, 15);
+ assertMetricLongSumValue(otelMetrics,
OpenTelemetryManagedLedgerStats.ADD_ENTRY_COUNTER, attribFailed, 0);
+ assertMetricLongSumValue(otelMetrics,
OpenTelemetryManagedLedgerStats.BYTES_OUT_COUNTER, attribCommon,
+ value -> assertThat(value).isPositive());
+ assertMetricLongSumValue(otelMetrics,
OpenTelemetryManagedLedgerStats.BYTES_OUT_WITH_REPLICAS_COUNTER,
+ attribCommon, value -> assertThat(value).isPositive());
+
+ assertMetricLongSumValue(otelMetrics,
OpenTelemetryManagedLedgerStats.READ_ENTRY_COUNTER, attribSucceed,
+ value -> assertThat(value).isPositive());
+ assertMetricLongSumValue(otelMetrics,
OpenTelemetryManagedLedgerStats.READ_ENTRY_COUNTER, attribFailed, 0);
+ assertMetricLongSumValue(otelMetrics,
OpenTelemetryManagedLedgerStats.BYTES_IN_COUNTER, attribCommon,
+ value -> assertThat(value).isPositive());
+ assertMetricLongSumValue(otelMetrics,
OpenTelemetryManagedLedgerStats.READ_ENTRY_CACHE_MISS_COUNTER,
+ attribCommon, value -> assertThat(value).isPositive());
+ });
}
@Test
diff --git
a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
index f485e300926..24dd1be8509 100644
---
a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
+++
b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
@@ -196,6 +196,23 @@ public interface OpenTelemetryAttributes {
public final Attributes attributes =
Attributes.of(PULSAR_CONNECTION_CREATE_STATUS, name().toLowerCase());
}
+ // Managed Ledger Attributes
+
+ /**
+ * The name of the managed ledger.
+ */
+ AttributeKey<String> ML_NAME =
AttributeKey.stringKey("pulsar.managed_ledger.name");
+
+ /**
+ * The status of the managed ledger operation.
+ */
+ AttributeKey<String> ML_OPERATION_STATUS =
AttributeKey.stringKey("pulsar.managed_ledger.operation.status");
+ enum ManagedLedgerOperationStatus {
+ SUCCESS,
+ FAILURE;
+ public final Attributes attributes =
Attributes.of(ML_OPERATION_STATUS, name().toLowerCase());
+ };
+
/**
* The type of the pool arena.
*/