This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8351c079d8e [feat][broker] PIP-264: Add OpenTelemetry managed cursor
metrics (#23000)
8351c079d8e is described below
commit 8351c079d8e8b162f964ed6a735edf76459070ec
Author: Dragos Misca <[email protected]>
AuthorDate: Fri Jul 5 02:45:55 2024 -0700
[feat][broker] PIP-264: Add OpenTelemetry managed cursor metrics (#23000)
---
.../apache/bookkeeper/mledger/ManagedCursor.java | 8 ++
.../mledger/ManagedCursorAttributes.java | 51 ++++++++
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 14 +++
.../mledger/impl/ManagedLedgerFactoryImpl.java | 3 +
.../impl/OpenTelemetryManagedCursorStats.java | 136 +++++++++++++++++++++
.../broker/stats/ManagedCursorMetricsTest.java | 98 +++++++++++++--
.../opentelemetry/OpenTelemetryAttributes.java | 23 ++++
7 files changed, 321 insertions(+), 12 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index 4aa3226a4dc..f6345e7b9ec 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -877,4 +877,12 @@ public interface ManagedCursor {
return false;
}
+ /**
+ * Get the attributes associated with the cursor.
+ *
+ * @return the attributes associated with the cursor
+ */
+ default ManagedCursorAttributes getManagedCursorAttributes() {
+ return new ManagedCursorAttributes(this);
+ }
}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursorAttributes.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursorAttributes.java
new file mode 100644
index 00000000000..6c06e68d75e
--- /dev/null
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursorAttributes.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger;
+
+import io.opentelemetry.api.common.Attributes;
+import lombok.Getter;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
+import
org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.ManagedCursorOperationStatus;
+
+@Getter
+public class ManagedCursorAttributes {
+
+ private final Attributes attributes;
+ private final Attributes attributesOperationSucceed;
+ private final Attributes attributesOperationFailure;
+
+ public ManagedCursorAttributes(ManagedCursor cursor) {
+ var mlName = cursor.getManagedLedger().getName();
+ var topicName =
TopicName.get(TopicName.fromPersistenceNamingEncoding(mlName));
+ attributes = Attributes.of(
+ OpenTelemetryAttributes.ML_CURSOR_NAME, cursor.getName(),
+ OpenTelemetryAttributes.ML_LEDGER_NAME, mlName,
+ OpenTelemetryAttributes.PULSAR_NAMESPACE,
topicName.getNamespace()
+ );
+ attributesOperationSucceed = Attributes.builder()
+ .putAll(attributes)
+ .putAll(ManagedCursorOperationStatus.SUCCESS.attributes)
+ .build();
+ attributesOperationFailure = Attributes.builder()
+ .putAll(attributes)
+ .putAll(ManagedCursorOperationStatus.FAILURE.attributes)
+ .build();
+ }
+}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 98ba722ba1c..4ef9678f3e1 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -77,6 +77,7 @@ import
org.apache.bookkeeper.mledger.AsyncCallbacks.ScanCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.SkipEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedCursorAttributes;
import org.apache.bookkeeper.mledger.ManagedCursorMXBean;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
@@ -286,6 +287,11 @@ public class ManagedCursorImpl implements ManagedCursor {
protected final ManagedCursorMXBean mbean;
+ private volatile ManagedCursorAttributes managedCursorAttributes;
+ private static final AtomicReferenceFieldUpdater<ManagedCursorImpl,
ManagedCursorAttributes> ATTRIBUTES_UPDATER =
+ AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class,
ManagedCursorAttributes.class,
+ "managedCursorAttributes");
+
@SuppressWarnings("checkstyle:javadoctype")
public interface VoidCallback {
void operationComplete();
@@ -3719,4 +3725,12 @@ public class ManagedCursorImpl implements ManagedCursor {
}
return newNonDurableCursor;
}
+
+ @Override
+ public ManagedCursorAttributes getManagedCursorAttributes() {
+ if (managedCursorAttributes != null) {
+ return managedCursorAttributes;
+ }
+ return ATTRIBUTES_UPDATER.updateAndGet(this, old -> old != null ? old
: new ManagedCursorAttributes(this));
+ }
}
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index b1939f40e93..00afb85a9d4 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -122,6 +122,7 @@ public class ManagedLedgerFactoryImpl implements
ManagedLedgerFactory {
private final OpenTelemetryManagedLedgerCacheStats openTelemetryCacheStats;
private final OpenTelemetryManagedLedgerStats
openTelemetryManagedLedgerStats;
+ private final OpenTelemetryManagedCursorStats
openTelemetryManagedCursorStats;
//indicate whether shutdown() is called.
private volatile boolean closed;
@@ -231,6 +232,7 @@ public class ManagedLedgerFactoryImpl implements
ManagedLedgerFactory {
openTelemetryCacheStats = new
OpenTelemetryManagedLedgerCacheStats(openTelemetry, this);
openTelemetryManagedLedgerStats = new
OpenTelemetryManagedLedgerStats(openTelemetry, this);
+ openTelemetryManagedCursorStats = new
OpenTelemetryManagedCursorStats(openTelemetry, this);
}
static class DefaultBkFactory implements
BookkeeperFactoryForCustomEnsemblePlacementPolicy {
@@ -622,6 +624,7 @@ public class ManagedLedgerFactoryImpl implements
ManagedLedgerFactory {
}));
}).thenAcceptAsync(__ -> {
//wait for tasks in scheduledExecutor executed.
+ openTelemetryManagedCursorStats.close();
openTelemetryManagedLedgerStats.close();
openTelemetryCacheStats.close();
scheduledExecutor.shutdownNow();
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpenTelemetryManagedCursorStats.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpenTelemetryManagedCursorStats.java
new file mode 100644
index 00000000000..93a749d4aef
--- /dev/null
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpenTelemetryManagedCursorStats.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl;
+
+import com.google.common.collect.Streams;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.metrics.BatchCallback;
+import io.opentelemetry.api.metrics.ObservableLongMeasurement;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.pulsar.opentelemetry.Constants;
+
+public class OpenTelemetryManagedCursorStats implements AutoCloseable {
+
+ // Replaces ['pulsar_ml_cursor_persistLedgerSucceed',
'pulsar_ml_cursor_persistLedgerErrors']
+ public static final String PERSIST_OPERATION_COUNTER =
"pulsar.broker.managed_ledger.persist.operation.count";
+ private final ObservableLongMeasurement persistOperationCounter;
+
+ // Replaces ['pulsar_ml_cursor_persistZookeeperSucceed',
'pulsar_ml_cursor_persistZookeeperErrors']
+ public static final String PERSIST_OPERATION_METADATA_STORE_COUNTER =
+ "pulsar.broker.managed_ledger.persist.mds.operation.count";
+ private final ObservableLongMeasurement
persistOperationMetadataStoreCounter;
+
+ // Replaces pulsar_ml_cursor_nonContiguousDeletedMessagesRange
+ public static final String NON_CONTIGUOUS_MESSAGE_RANGE_COUNTER =
+ "pulsar.broker.managed_ledger.message_range.count";
+ private final ObservableLongMeasurement nonContiguousMessageRangeCounter;
+
+ // Replaces pulsar_ml_cursor_writeLedgerSize
+ public static final String OUTGOING_BYTE_COUNTER =
"pulsar.broker.managed_ledger.cursor.outgoing.size";
+ private final ObservableLongMeasurement outgoingByteCounter;
+
+ // Replaces pulsar_ml_cursor_writeLedgerLogicalSize
+ public static final String OUTGOING_BYTE_LOGICAL_COUNTER =
+ "pulsar.broker.managed_ledger.cursor.outgoing.logical.size";
+ private final ObservableLongMeasurement outgoingByteLogicalCounter;
+
+ // Replaces pulsar_ml_cursor_readLedgerSize
+ public static final String INCOMING_BYTE_COUNTER =
"pulsar.broker.managed_ledger.cursor.incoming.size";
+ private final ObservableLongMeasurement incomingByteCounter;
+
+ private final BatchCallback batchCallback;
+
+ public OpenTelemetryManagedCursorStats(OpenTelemetry openTelemetry,
ManagedLedgerFactoryImpl factory) {
+ var meter =
openTelemetry.getMeter(Constants.BROKER_INSTRUMENTATION_SCOPE_NAME);
+
+ persistOperationCounter = meter
+ .counterBuilder(PERSIST_OPERATION_COUNTER)
+ .setUnit("{operation}")
+ .setDescription("The number of acknowledgment operations on
the ledger.")
+ .buildObserver();
+
+ persistOperationMetadataStoreCounter = meter
+ .counterBuilder(PERSIST_OPERATION_METADATA_STORE_COUNTER)
+ .setUnit("{operation}")
+ .setDescription("The number of acknowledgment operations in
the metadata store.")
+ .buildObserver();
+
+ nonContiguousMessageRangeCounter = meter
+ .upDownCounterBuilder(NON_CONTIGUOUS_MESSAGE_RANGE_COUNTER)
+ .setUnit("{range}")
+ .setDescription("The number of non-contiguous deleted messages
ranges.")
+ .buildObserver();
+
+ outgoingByteCounter = meter
+ .counterBuilder(OUTGOING_BYTE_COUNTER)
+ .setUnit("{By}")
+ .setDescription("The total amount of data written to the
ledger.")
+ .buildObserver();
+
+ outgoingByteLogicalCounter = meter
+ .counterBuilder(OUTGOING_BYTE_LOGICAL_COUNTER)
+ .setUnit("{By}")
+ .setDescription("The total amount of data written to the
ledger, not including replicas.")
+ .buildObserver();
+
+ incomingByteCounter = meter
+ .counterBuilder(INCOMING_BYTE_COUNTER)
+ .setUnit("{By}")
+ .setDescription("The total amount of data read from the
ledger.")
+ .buildObserver();
+
+ batchCallback = meter.batchCallback(() -> factory.getManagedLedgers()
+ .values()
+ .stream()
+ .map(ManagedLedgerImpl::getCursors)
+ .flatMap(Streams::stream)
+ .forEach(this::recordMetrics),
+ persistOperationCounter,
+ persistOperationMetadataStoreCounter,
+ nonContiguousMessageRangeCounter,
+ outgoingByteCounter,
+ outgoingByteLogicalCounter,
+ incomingByteCounter);
+ }
+
+ @Override
+ public void close() {
+ batchCallback.close();
+ }
+
+ private void recordMetrics(ManagedCursor cursor) {
+ var stats = cursor.getStats();
+ var cursorAttributesSet = cursor.getManagedCursorAttributes();
+ var attributes = cursorAttributesSet.getAttributes();
+ var attributesSucceed =
cursorAttributesSet.getAttributesOperationSucceed();
+ var attributesFailed =
cursorAttributesSet.getAttributesOperationFailure();
+
+ persistOperationCounter.record(stats.getPersistLedgerSucceed(),
attributesSucceed);
+ persistOperationCounter.record(stats.getPersistLedgerErrors(),
attributesFailed);
+
+
persistOperationMetadataStoreCounter.record(stats.getPersistZookeeperSucceed(),
attributesSucceed);
+
persistOperationMetadataStoreCounter.record(stats.getPersistZookeeperErrors(),
attributesFailed);
+
+
nonContiguousMessageRangeCounter.record(cursor.getTotalNonContiguousDeletedMessagesRange(),
attributes);
+
+ outgoingByteCounter.record(stats.getWriteCursorLedgerSize(),
attributes);
+
outgoingByteLogicalCounter.record(stats.getWriteCursorLedgerLogicalSize(),
attributes);
+ incomingByteCounter.record(stats.getReadCursorLedgerSize(),
attributes);
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
index baa4bea5701..8ddb5320588 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java
@@ -18,20 +18,24 @@
*/
package org.apache.pulsar.broker.stats;
+import static
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
+import static org.assertj.core.api.Assertions.assertThat;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedCursorMXBean;
-import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.bookkeeper.mledger.ManagedCursorAttributes;
+import org.apache.bookkeeper.mledger.impl.OpenTelemetryManagedCursorStats;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.stats.metrics.ManagedCursorMetrics;
+import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
@@ -80,6 +84,12 @@ public class ManagedCursorMetricsTest extends
MockedPulsarServiceBaseTest {
return PulsarTestClient.create(clientBuilder);
}
+ @Override
+ protected void
customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder
pulsarTestContextBuilder) {
+ super.customizeMainPulsarTestContextBuilder(pulsarTestContextBuilder);
+ pulsarTestContextBuilder.enableOpenTelemetry(true);
+ }
+
/***
* This method has overridden these case:
* brk_ml_cursor_persistLedgerSucceed
@@ -115,10 +125,7 @@ public class ManagedCursorMetricsTest extends
MockedPulsarServiceBaseTest {
.topic(topicName)
.enableBatching(false)
.create();
- final PersistentSubscription persistentSubscription =
- (PersistentSubscription) pulsar.getBrokerService()
- .getTopic(topicName,
false).get().get().getSubscription(subName);
- final ManagedCursorImpl managedCursor = (ManagedCursorImpl)
persistentSubscription.getCursor();
+ var managedCursor = getManagedCursor(topicName, subName);
ManagedCursorMXBean managedCursorMXBean = managedCursor.getStats();
// Assert.
metricsList = metrics.generate();
@@ -128,6 +135,19 @@ public class ManagedCursorMetricsTest extends
MockedPulsarServiceBaseTest {
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperSucceed"),
0L);
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperErrors"),
0L);
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_nonContiguousDeletedMessagesRange"),
0L);
+ // Validate OpenTelemetry metrics as well
+ var attributesSet = new ManagedCursorAttributes(managedCursor);
+ var otelMetrics =
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+ assertMetricLongSumValue(otelMetrics,
OpenTelemetryManagedCursorStats.PERSIST_OPERATION_COUNTER,
+ attributesSet.getAttributesOperationSucceed(), 0);
+ assertMetricLongSumValue(otelMetrics,
OpenTelemetryManagedCursorStats.PERSIST_OPERATION_COUNTER,
+ attributesSet.getAttributesOperationFailure(), 0);
+ assertMetricLongSumValue(otelMetrics,
OpenTelemetryManagedCursorStats.PERSIST_OPERATION_METADATA_STORE_COUNTER,
+ attributesSet.getAttributesOperationSucceed(), 0);
+ assertMetricLongSumValue(otelMetrics,
OpenTelemetryManagedCursorStats.PERSIST_OPERATION_METADATA_STORE_COUNTER,
+ attributesSet.getAttributesOperationFailure(), 0);
+ assertMetricLongSumValue(otelMetrics,
OpenTelemetryManagedCursorStats.NON_CONTIGUOUS_MESSAGE_RANGE_COUNTER,
+ attributesSet.getAttributes(), 0);
/**
* 1. Send many messages, and only ack half. After the cursor data is
written to BK,
* verify "brk_ml_cursor_persistLedgerSucceed" and
"brk_ml_cursor_nonContiguousDeletedMessagesRange".
@@ -156,6 +176,17 @@ public class ManagedCursorMetricsTest extends
MockedPulsarServiceBaseTest {
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperErrors"),
0L);
Assert.assertNotEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_nonContiguousDeletedMessagesRange"),
0L);
+ otelMetrics =
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+ assertMetricLongSumValue(otelMetrics,
OpenTelemetryManagedCursorStats.PERSIST_OPERATION_COUNTER,
+ attributesSet.getAttributesOperationSucceed(), value ->
assertThat(value).isPositive());
+ assertMetricLongSumValue(otelMetrics,
OpenTelemetryManagedCursorStats.PERSIST_OPERATION_COUNTER,
+ attributesSet.getAttributesOperationFailure(), 0);
+ assertMetricLongSumValue(otelMetrics,
OpenTelemetryManagedCursorStats.PERSIST_OPERATION_METADATA_STORE_COUNTER,
+ attributesSet.getAttributesOperationSucceed(), 0);
+ assertMetricLongSumValue(otelMetrics,
OpenTelemetryManagedCursorStats.PERSIST_OPERATION_METADATA_STORE_COUNTER,
+ attributesSet.getAttributesOperationFailure(), 0);
+ assertMetricLongSumValue(otelMetrics,
OpenTelemetryManagedCursorStats.NON_CONTIGUOUS_MESSAGE_RANGE_COUNTER,
+ attributesSet.getAttributes(), value ->
assertThat(value).isPositive());
// Ack another half.
for (MessageId messageId : keepsMessageIdList){
consumer.acknowledge(messageId);
@@ -171,6 +202,17 @@ public class ManagedCursorMetricsTest extends
MockedPulsarServiceBaseTest {
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperSucceed"),
0L);
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperErrors"),
0L);
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_nonContiguousDeletedMessagesRange"),
0L);
+ otelMetrics =
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+ assertMetricLongSumValue(otelMetrics,
OpenTelemetryManagedCursorStats.PERSIST_OPERATION_COUNTER,
+ attributesSet.getAttributesOperationSucceed(), value ->
assertThat(value).isPositive());
+ assertMetricLongSumValue(otelMetrics,
OpenTelemetryManagedCursorStats.PERSIST_OPERATION_COUNTER,
+ attributesSet.getAttributesOperationFailure(), 0);
+ assertMetricLongSumValue(otelMetrics,
OpenTelemetryManagedCursorStats.PERSIST_OPERATION_METADATA_STORE_COUNTER,
+ attributesSet.getAttributesOperationSucceed(), 0);
+ assertMetricLongSumValue(otelMetrics,
OpenTelemetryManagedCursorStats.PERSIST_OPERATION_METADATA_STORE_COUNTER,
+ attributesSet.getAttributesOperationFailure(), 0);
+ assertMetricLongSumValue(otelMetrics,
OpenTelemetryManagedCursorStats.NON_CONTIGUOUS_MESSAGE_RANGE_COUNTER,
+ attributesSet.getAttributes(), 0);
/**
* Make BK error, and send many message, then wait cursor persistent
finish.
* After the cursor data is written to ZK, verify
"brk_ml_cursor_persistLedgerErrors" and
@@ -196,6 +238,17 @@ public class ManagedCursorMetricsTest extends
MockedPulsarServiceBaseTest {
Assert.assertNotEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperSucceed"),
0L);
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_persistZookeeperErrors"),
0L);
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_nonContiguousDeletedMessagesRange"),
0L);
+ otelMetrics =
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+ assertMetricLongSumValue(otelMetrics,
OpenTelemetryManagedCursorStats.PERSIST_OPERATION_COUNTER,
+ attributesSet.getAttributesOperationSucceed(), value ->
assertThat(value).isPositive());
+ assertMetricLongSumValue(otelMetrics,
OpenTelemetryManagedCursorStats.PERSIST_OPERATION_COUNTER,
+ attributesSet.getAttributesOperationFailure(), value ->
assertThat(value).isPositive());
+ assertMetricLongSumValue(otelMetrics,
OpenTelemetryManagedCursorStats.PERSIST_OPERATION_METADATA_STORE_COUNTER,
+ attributesSet.getAttributesOperationSucceed(), value ->
assertThat(value).isPositive());
+ assertMetricLongSumValue(otelMetrics,
OpenTelemetryManagedCursorStats.PERSIST_OPERATION_METADATA_STORE_COUNTER,
+ attributesSet.getAttributesOperationFailure(), 0);
+ assertMetricLongSumValue(otelMetrics,
OpenTelemetryManagedCursorStats.NON_CONTIGUOUS_MESSAGE_RANGE_COUNTER,
+ attributesSet.getAttributes(), 0);
/**
* TODO verify "brk_ml_cursor_persistZookeeperErrors".
* This is not easy to implement, we can use {@link #mockZooKeeper} to
fail ZK, but we cannot identify whether
@@ -210,13 +263,16 @@ public class ManagedCursorMetricsTest extends
MockedPulsarServiceBaseTest {
admin.topics().delete(topicName, true);
}
- private ManagedCursorMXBean getManagedCursorMXBean(String topicName,
String subscriptionName)
- throws ExecutionException, InterruptedException {
+ private ManagedCursorMXBean getManagedCursorMXBean(String topicName,
String subscriptionName) throws Exception {
+ var managedCursor = getManagedCursor(topicName, subscriptionName);
+ return managedCursor.getStats();
+ }
+
+ private ManagedCursor getManagedCursor(String topicName, String
subscriptionName) throws Exception {
final PersistentSubscription persistentSubscription =
(PersistentSubscription) pulsar.getBrokerService()
.getTopic(topicName,
false).get().get().getSubscription(subscriptionName);
- final ManagedCursorImpl managedCursor = (ManagedCursorImpl)
persistentSubscription.getCursor();
- return managedCursor.getStats();
+ return persistentSubscription.getCursor();
}
@Test
@@ -265,9 +321,11 @@ public class ManagedCursorMetricsTest extends
MockedPulsarServiceBaseTest {
}
}
+ var managedCursor1 = getManagedCursor(topicName, subName1);
+ var cursorMXBean1 = managedCursor1.getStats();
+ var managedCursor2 = getManagedCursor(topicName, subName2);
+ var cursorMXBean2 = managedCursor2.getStats();
// Wait for persistent cursor meta.
- ManagedCursorMXBean cursorMXBean1 = getManagedCursorMXBean(topicName,
subName1);
- ManagedCursorMXBean cursorMXBean2 = getManagedCursorMXBean(topicName,
subName2);
Awaitility.await().until(() ->
cursorMXBean1.getWriteCursorLedgerLogicalSize() > 0);
Awaitility.await().until(() ->
cursorMXBean2.getWriteCursorLedgerLogicalSize() > 0);
@@ -281,6 +339,22 @@ public class ManagedCursorMetricsTest extends
MockedPulsarServiceBaseTest {
Assert.assertNotEquals(metricsList.get(1).getMetrics().get("brk_ml_cursor_writeLedgerLogicalSize"),
0L);
Assert.assertEquals(metricsList.get(1).getMetrics().get("brk_ml_cursor_readLedgerSize"),
0L);
+ var otelMetrics =
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+ var attributes1 = new
ManagedCursorAttributes(managedCursor1).getAttributes();
+ assertMetricLongSumValue(otelMetrics,
OpenTelemetryManagedCursorStats.OUTGOING_BYTE_COUNTER,
+ attributes1, value -> assertThat(value).isPositive());
+ assertMetricLongSumValue(otelMetrics,
OpenTelemetryManagedCursorStats.OUTGOING_BYTE_LOGICAL_COUNTER,
+ attributes1, value -> assertThat(value).isPositive());
+ assertMetricLongSumValue(otelMetrics,
OpenTelemetryManagedCursorStats.INCOMING_BYTE_COUNTER,
+ attributes1, 0);
+
+ var attributes2 = new
ManagedCursorAttributes(managedCursor2).getAttributes();
+ assertMetricLongSumValue(otelMetrics,
OpenTelemetryManagedCursorStats.OUTGOING_BYTE_COUNTER,
+ attributes2, value -> assertThat(value).isPositive());
+ assertMetricLongSumValue(otelMetrics,
OpenTelemetryManagedCursorStats.OUTGOING_BYTE_LOGICAL_COUNTER,
+ attributes2, value -> assertThat(value).isPositive());
+ assertMetricLongSumValue(otelMetrics,
OpenTelemetryManagedCursorStats.INCOMING_BYTE_COUNTER,
+ attributes2, 0);
// cleanup.
consumer.close();
consumer2.close();
diff --git
a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
index 24dd1be8509..41358a72c0d 100644
---
a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
+++
b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
@@ -116,6 +116,7 @@ public interface OpenTelemetryAttributes {
* The status of the Pulsar transaction.
*/
AttributeKey<String> PULSAR_TRANSACTION_STATUS =
AttributeKey.stringKey("pulsar.transaction.status");
+
enum TransactionStatus {
ABORTED,
ACTIVE,
@@ -174,6 +175,28 @@ public interface OpenTelemetryAttributes {
public final Attributes attributes =
Attributes.of(PULSAR_BACKLOG_QUOTA_TYPE, name().toLowerCase());
}
+ // Managed Ledger Attributes
+ /**
+ * The name of the managed ledger.
+ */
+ AttributeKey<String> ML_LEDGER_NAME =
AttributeKey.stringKey("pulsar.managed_ledger.name");
+
+ /**
+ * The name of the managed cursor.
+ */
+ AttributeKey<String> ML_CURSOR_NAME =
AttributeKey.stringKey("pulsar.managed_ledger.cursor.name");
+
+ /**
+ * The status of the managed cursor operation.
+ */
+ AttributeKey<String> ML_CURSOR_OPERATION_STATUS =
+
AttributeKey.stringKey("pulsar.managed_ledger.cursor.operation.status");
+ enum ManagedCursorOperationStatus {
+ SUCCESS,
+ FAILURE;
+ public final Attributes attributes =
Attributes.of(ML_CURSOR_OPERATION_STATUS, name().toLowerCase());
+ }
+
/**
* The name of the remote cluster for a Pulsar replicator.
*/