This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 243ad5a22f7 [feat][broker] PIP-264: Add OpenTelemetry metadata store
stats (#22952)
243ad5a22f7 is described below
commit 243ad5a22f7cc533e5a7382c01cb875dcdaee6bb
Author: Dragos Misca <[email protected]>
AuthorDate: Tue Jun 25 22:42:57 2024 -0700
[feat][broker] PIP-264: Add OpenTelemetry metadata store stats (#22952)
---
.../org/apache/pulsar/broker/PulsarService.java | 15 +++-
.../broker/stats/PulsarBrokerOpenTelemetry.java | 2 +
.../AntiAffinityNamespaceGroupTest.java | 6 +-
.../stats/OpenTelemetryMetadataStoreStatsTest.java | 94 ++++++++++++++++++++++
.../testcontext/AbstractTestPulsarService.java | 7 +-
.../pulsar/metadata/api/MetadataStoreConfig.java | 7 ++
.../metadata/impl/AbstractMetadataStore.java | 5 +-
.../metadata/impl/LocalMemoryMetadataStore.java | 2 +-
.../pulsar/metadata/impl/RocksdbMetadataStore.java | 2 +-
.../batching/AbstractBatchedMetadataStore.java | 4 +-
.../metadata/impl/oxia/OxiaMetadataStore.java | 7 +-
.../impl/stats/BatchMetadataStoreStats.java | 23 +++++-
.../metadata/impl/stats/MetadataStoreStats.java | 20 ++++-
.../impl/MetadataStoreFactoryImplTest.java | 3 +-
14 files changed, 175 insertions(+), 22 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 8cf1376642b..617afc6e5d1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -31,6 +31,7 @@ import io.netty.channel.socket.SocketChannel;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import io.netty.util.concurrent.DefaultThreadFactory;
+import io.opentelemetry.api.OpenTelemetry;
import
io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder;
import java.io.IOException;
import java.lang.reflect.Constructor;
@@ -382,7 +383,8 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
DEFAULT_MONOTONIC_CLOCK_GRANULARITY_MILLIS), System::nanoTime);
}
- public MetadataStore
createConfigurationMetadataStore(PulsarMetadataEventSynchronizer synchronizer)
+ public MetadataStore
createConfigurationMetadataStore(PulsarMetadataEventSynchronizer synchronizer,
+ OpenTelemetry
openTelemetry)
throws MetadataStoreException {
return
MetadataStoreFactory.create(config.getConfigurationMetadataStoreUrl(),
MetadataStoreConfig.builder()
@@ -395,6 +397,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
.batchingMaxSizeKb(config.getMetadataStoreBatchingMaxSizeKb())
.metadataStoreName(MetadataStoreConfig.CONFIGURATION_METADATA_STORE)
.synchronizer(synchronizer)
+ .openTelemetry(openTelemetry)
.build());
}
@@ -845,7 +848,8 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
localMetadataSynchronizer =
StringUtils.isNotBlank(config.getMetadataSyncEventTopic())
? new PulsarMetadataEventSynchronizer(this,
config.getMetadataSyncEventTopic())
: null;
- localMetadataStore =
createLocalMetadataStore(localMetadataSynchronizer);
+ localMetadataStore =
createLocalMetadataStore(localMetadataSynchronizer,
+
openTelemetry.getOpenTelemetryService().getOpenTelemetry());
localMetadataStore.registerSessionListener(this::handleMetadataSessionEvent);
coordinationService = new
CoordinationServiceImpl(localMetadataStore);
@@ -854,7 +858,8 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
configMetadataSynchronizer =
StringUtils.isNotBlank(config.getConfigurationMetadataSyncEventTopic())
? new PulsarMetadataEventSynchronizer(this,
config.getConfigurationMetadataSyncEventTopic())
: null;
- configurationMetadataStore =
createConfigurationMetadataStore(configMetadataSynchronizer);
+ configurationMetadataStore =
createConfigurationMetadataStore(configMetadataSynchronizer,
+
openTelemetry.getOpenTelemetryService().getOpenTelemetry());
shouldShutdownConfigurationMetadataStore = true;
} else {
configurationMetadataStore = localMetadataStore;
@@ -1209,7 +1214,8 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
}
}
- public MetadataStoreExtended
createLocalMetadataStore(PulsarMetadataEventSynchronizer synchronizer)
+ public MetadataStoreExtended
createLocalMetadataStore(PulsarMetadataEventSynchronizer synchronizer,
+ OpenTelemetry
openTelemetry)
throws MetadataStoreException, PulsarServerException {
return MetadataStoreExtended.create(config.getMetadataStoreUrl(),
MetadataStoreConfig.builder()
@@ -1222,6 +1228,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
.batchingMaxSizeKb(config.getMetadataStoreBatchingMaxSizeKb())
.synchronizer(synchronizer)
.metadataStoreName(MetadataStoreConfig.METADATA_STORE)
+ .openTelemetry(openTelemetry)
.build());
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java
index 01ca65d2cc5..c1bcfadaf97 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java
@@ -31,6 +31,8 @@ import org.apache.pulsar.opentelemetry.OpenTelemetryService;
public class PulsarBrokerOpenTelemetry implements Closeable {
public static final String SERVICE_NAME = "pulsar-broker";
+
+ @Getter
private final OpenTelemetryService openTelemetryService;
@Getter
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
index 5fbda961c0e..fc2fec96294 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
@@ -26,6 +26,7 @@ import com.google.common.collect.BoundType;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import com.google.common.hash.Hashing;
+import io.opentelemetry.api.OpenTelemetry;
import java.lang.reflect.Field;
import java.util.Collections;
import java.util.HashMap;
@@ -136,8 +137,9 @@ public class AntiAffinityNamespaceGroupTest extends
MockedPulsarServiceBaseTest
protected void beforePulsarStart(PulsarService pulsar) throws Exception {
if (resources == null) {
- MetadataStoreExtended localStore =
pulsar.createLocalMetadataStore(null);
- MetadataStoreExtended configStore = (MetadataStoreExtended)
pulsar.createConfigurationMetadataStore(null);
+ MetadataStoreExtended localStore =
pulsar.createLocalMetadataStore(null, OpenTelemetry.noop());
+ MetadataStoreExtended configStore =
+ (MetadataStoreExtended)
pulsar.createConfigurationMetadataStore(null, OpenTelemetry.noop());
resources = new PulsarResources(localStore, configStore);
}
this.createNamespaceIfNotExists(resources,
NamespaceName.SYSTEM_NAMESPACE.getTenant(),
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryMetadataStoreStatsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryMetadataStoreStatsTest.java
new file mode 100644
index 00000000000..15689fca5d7
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryMetadataStoreStatsTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats;
+
+import static
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
+import static org.assertj.core.api.Assertions.assertThat;
+import io.opentelemetry.api.common.Attributes;
+import java.util.concurrent.ExecutorService;
+import lombok.Cleanup;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.testcontext.NonClosingProxyHandler;
+import org.apache.pulsar.broker.testcontext.PulsarTestContext;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.impl.stats.BatchMetadataStoreStats;
+import org.apache.pulsar.metadata.impl.stats.MetadataStoreStats;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class OpenTelemetryMetadataStoreStatsTest extends BrokerTestBase {
+
+ @BeforeMethod(alwaysRun = true)
+ @Override
+ protected void setup() throws Exception {
+ super.baseSetup();
+ setupDefaultTenantAndNamespace();
+
+ // In testing conditions, the metadata store gets initialized before
Pulsar does, so the OpenTelemetry SDK is
+ // not yet initialized. Work around this issue by recreating the stats
object once we have access to the SDK.
+ var localMetadataStore = (MetadataStore)
NonClosingProxyHandler.getDelegate(pulsar.getLocalMetadataStore());
+ var currentStats = (MetadataStoreStats)
FieldUtils.readField(localMetadataStore, "metadataStoreStats", true);
+ var localMetadataStoreName = (String)
FieldUtils.readField(currentStats, "metadataStoreName", true);
+
+ currentStats.close();
+ var newStats = new MetadataStoreStats(
+ localMetadataStoreName,
pulsar.getOpenTelemetry().getOpenTelemetryService().getOpenTelemetry());
+ FieldUtils.writeField(localMetadataStore, "metadataStoreStats",
newStats, true);
+
+ var currentBatchedStats = (BatchMetadataStoreStats)
FieldUtils.readField(localMetadataStore, "batchMetadataStoreStats", true);
+ currentBatchedStats.close();
+ var currentExecutor = (ExecutorService)
FieldUtils.readField(currentBatchedStats, "executor", true);
+ var newBatchedStats = new BatchMetadataStoreStats(
+ localMetadataStoreName, currentExecutor,
pulsar.getOpenTelemetry().getOpenTelemetryService().getOpenTelemetry());
+ FieldUtils.writeField(localMetadataStore, "batchMetadataStoreStats",
newBatchedStats, true);
+ }
+
+ @AfterMethod(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Override
+ protected void
customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder
pulsarTestContextBuilder) {
+ super.customizeMainPulsarTestContextBuilder(pulsarTestContextBuilder);
+ pulsarTestContextBuilder.enableOpenTelemetry(true);
+ }
+
+ @Test
+ public void testMetadataStoreStats() throws Exception {
+ var topicName =
BrokerTestUtil.newUniqueName("persistent://public/default/test-metadata-store-stats");
+
+ @Cleanup
+ var producer = pulsarClient.newProducer().topic(topicName).create();
+
+ producer.newMessage().value("test".getBytes()).send();
+
+ var attributes = Attributes.of(MetadataStoreStats.METADATA_STORE_NAME,
"metadata-store");
+
+ var metrics =
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+ assertMetricLongSumValue(metrics,
MetadataStoreStats.METADATA_STORE_PUT_BYTES_COUNTER_METRIC_NAME,
+ attributes, value -> assertThat(value).isPositive());
+ assertMetricLongSumValue(metrics,
BatchMetadataStoreStats.EXECUTOR_QUEUE_SIZE_METRIC_NAME, attributes,
+ value -> assertThat(value).isPositive());
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java
index c459098f685..c67714484f4 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.testcontext;
+import io.opentelemetry.api.OpenTelemetry;
import
io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder;
import java.io.IOException;
import java.util.Optional;
@@ -68,7 +69,8 @@ abstract class AbstractTestPulsarService extends
PulsarService {
}
@Override
- public MetadataStore
createConfigurationMetadataStore(PulsarMetadataEventSynchronizer synchronizer)
+ public MetadataStore
createConfigurationMetadataStore(PulsarMetadataEventSynchronizer synchronizer,
+ OpenTelemetry
openTelemetry)
throws MetadataStoreException {
if (synchronizer != null) {
synchronizer.registerSyncListener(
@@ -78,7 +80,8 @@ abstract class AbstractTestPulsarService extends
PulsarService {
}
@Override
- public MetadataStoreExtended
createLocalMetadataStore(PulsarMetadataEventSynchronizer synchronizer)
+ public MetadataStoreExtended
createLocalMetadataStore(PulsarMetadataEventSynchronizer synchronizer,
+ OpenTelemetry
openTelemetry)
throws MetadataStoreException, PulsarServerException {
if (synchronizer != null) {
synchronizer.registerSyncListener(
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
index 5ddfe33c391..be29f843eea 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.metadata.api;
+import io.opentelemetry.api.OpenTelemetry;
import lombok.Builder;
import lombok.Getter;
import lombok.ToString;
@@ -92,4 +93,10 @@ public class MetadataStoreConfig {
* separate clusters.
*/
private MetadataEventSynchronizer synchronizer;
+
+ /**
+ * OpenTelemetry instance to monitor metadata store operations.
+ */
+ @Builder.Default
+ private OpenTelemetry openTelemetry = OpenTelemetry.noop();
}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index 7315e6a04a2..f35f1974632 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -26,6 +26,7 @@ import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.google.common.annotations.VisibleForTesting;
import io.netty.util.concurrent.DefaultThreadFactory;
+import io.opentelemetry.api.OpenTelemetry;
import java.time.Instant;
import java.util.Collections;
import java.util.EnumSet;
@@ -88,7 +89,7 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
protected abstract CompletableFuture<Boolean> existsFromStore(String path);
- protected AbstractMetadataStore(String metadataStoreName) {
+ protected AbstractMetadataStore(String metadataStoreName, OpenTelemetry
openTelemetry) {
this.executor = new ScheduledThreadPoolExecutor(1,
new DefaultThreadFactory(
StringUtils.isNotBlank(metadataStoreName) ?
metadataStoreName : getClass().getSimpleName()));
@@ -137,7 +138,7 @@ public abstract class AbstractMetadataStore implements
MetadataStoreExtended, Co
});
this.metadataStoreName = metadataStoreName;
- this.metadataStoreStats = new MetadataStoreStats(metadataStoreName);
+ this.metadataStoreStats = new MetadataStoreStats(metadataStoreName,
openTelemetry);
}
@Override
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
index 3909a89cf5e..e95f1947740 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
@@ -78,7 +78,7 @@ public class LocalMemoryMetadataStore extends
AbstractMetadataStore implements M
public LocalMemoryMetadataStore(String metadataURL, MetadataStoreConfig
metadataStoreConfig)
throws MetadataStoreException {
- super(metadataStoreConfig.getMetadataStoreName());
+ super(metadataStoreConfig.getMetadataStoreName(),
metadataStoreConfig.getOpenTelemetry());
String name = metadataURL.substring(MEMORY_SCHEME_IDENTIFIER.length());
// Local means a private data set
// update synchronizer and register sync listener
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
index 06f7b260536..20e3c4c2b27 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
@@ -209,7 +209,7 @@ public class RocksdbMetadataStore extends
AbstractMetadataStore {
*/
private RocksdbMetadataStore(String metadataURL, MetadataStoreConfig
metadataStoreConfig)
throws MetadataStoreException {
- super(metadataStoreConfig.getMetadataStoreName());
+ super(metadataStoreConfig.getMetadataStoreName(),
metadataStoreConfig.getOpenTelemetry());
this.metadataUrl = metadataURL;
try {
RocksDB.loadLibrary();
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
index 4fa1c6aca0f..4275920d7f9 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
@@ -56,7 +56,7 @@ public abstract class AbstractBatchedMetadataStore extends
AbstractMetadataStore
private final BatchMetadataStoreStats batchMetadataStoreStats;
protected AbstractBatchedMetadataStore(MetadataStoreConfig conf) {
- super(conf.getMetadataStoreName());
+ super(conf.getMetadataStoreName(), conf.getOpenTelemetry());
this.enabled = conf.isBatchingEnabled();
this.maxDelayMillis = conf.getBatchingMaxDelayMillis();
@@ -77,7 +77,7 @@ public abstract class AbstractBatchedMetadataStore extends
AbstractMetadataStore
// update synchronizer and register sync listener
updateMetadataEventSynchronizer(conf.getSynchronizer());
this.batchMetadataStoreStats =
- new BatchMetadataStoreStats(metadataStoreName, executor);
+ new BatchMetadataStoreStats(metadataStoreName, executor,
conf.getOpenTelemetry());
}
@Override
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
index 154a0ec0c4f..e9da7ec7c1a 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.metadata.impl.oxia;
+import io.opentelemetry.api.OpenTelemetry;
import io.streamnative.oxia.client.api.AsyncOxiaClient;
import io.streamnative.oxia.client.api.DeleteOption;
import io.streamnative.oxia.client.api.Notification;
@@ -58,7 +59,7 @@ public class OxiaMetadataStore extends AbstractMetadataStore {
private Optional<MetadataEventSynchronizer> synchronizer;
public OxiaMetadataStore(AsyncOxiaClient oxia, String identity) {
- super("oxia-metadata");
+ super("oxia-metadata", OpenTelemetry.noop());
this.client = oxia;
this.identity = identity;
this.synchronizer = Optional.empty();
@@ -68,10 +69,10 @@ public class OxiaMetadataStore extends
AbstractMetadataStore {
public OxiaMetadataStore(
@NonNull String serviceAddress,
@NonNull String namespace,
- @NonNull MetadataStoreConfig metadataStoreConfig,
+ MetadataStoreConfig metadataStoreConfig,
boolean enableSessionWatcher)
throws Exception {
- super("oxia-metadata");
+ super("oxia-metadata",
Objects.requireNonNull(metadataStoreConfig).getOpenTelemetry());
var linger = metadataStoreConfig.getBatchingMaxDelayMillis();
if (!metadataStoreConfig.isBatchingEnabled()) {
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/BatchMetadataStoreStats.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/BatchMetadataStoreStats.java
index f87155b9259..9549a8df8f9 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/BatchMetadataStoreStats.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/BatchMetadataStoreStats.java
@@ -18,6 +18,9 @@
*/
package org.apache.pulsar.metadata.impl.stats;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.ObservableLongUpDownCounter;
import io.prometheus.client.Gauge;
import io.prometheus.client.Histogram;
import java.util.concurrent.ExecutorService;
@@ -58,7 +61,10 @@ public final class BatchMetadataStoreStats implements
AutoCloseable {
private final Histogram.Child batchExecuteTimeChild;
private final Histogram.Child opsPerBatchChild;
- public BatchMetadataStoreStats(String metadataStoreName, ExecutorService
executor) {
+ public static final String EXECUTOR_QUEUE_SIZE_METRIC_NAME =
"pulsar.broker.metadata.store.executor.queue.size";
+ private final ObservableLongUpDownCounter batchMetadataStoreSizeCounter;
+
+ public BatchMetadataStoreStats(String metadataStoreName, ExecutorService
executor, OpenTelemetry openTelemetry) {
if (executor instanceof ThreadPoolExecutor tx) {
this.executor = tx;
} else {
@@ -69,8 +75,7 @@ public final class BatchMetadataStoreStats implements
AutoCloseable {
EXECUTOR_QUEUE_SIZE.setChild(new Gauge.Child() {
@Override
public double get() {
- return BatchMetadataStoreStats.this.executor == null ? 0 :
-
BatchMetadataStoreStats.this.executor.getQueue().size();
+ return getQueueSize();
}
}, metadataStoreName);
@@ -78,6 +83,17 @@ public final class BatchMetadataStoreStats implements
AutoCloseable {
this.batchExecuteTimeChild =
BATCH_EXECUTE_TIME.labels(metadataStoreName);
this.opsPerBatchChild = OPS_PER_BATCH.labels(metadataStoreName);
+ var meter = openTelemetry.getMeter("org.apache.pulsar");
+ var attributes = Attributes.of(MetadataStoreStats.METADATA_STORE_NAME,
metadataStoreName);
+ this.batchMetadataStoreSizeCounter = meter
+ .upDownCounterBuilder(EXECUTOR_QUEUE_SIZE_METRIC_NAME)
+ .setDescription("The number of batch operations in the
metadata store executor queue")
+ .setUnit("{operation}")
+ .buildWithCallback(measurement ->
measurement.record(getQueueSize(), attributes));
+ }
+
+ private int getQueueSize() {
+ return executor == null ? 0 : executor.getQueue().size();
}
public void recordOpWaiting(long millis) {
@@ -99,6 +115,7 @@ public final class BatchMetadataStoreStats implements
AutoCloseable {
OPS_WAITING.remove(this.metadataStoreName);
BATCH_EXECUTE_TIME.remove(this.metadataStoreName);
OPS_PER_BATCH.remove(metadataStoreName);
+ batchMetadataStoreSizeCounter.close();
}
}
}
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/MetadataStoreStats.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/MetadataStoreStats.java
index 45024a68383..5f0383f9520 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/MetadataStoreStats.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/MetadataStoreStats.java
@@ -18,6 +18,10 @@
*/
package org.apache.pulsar.metadata.impl.stats;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.LongCounter;
import io.prometheus.client.Counter;
import io.prometheus.client.Histogram;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -48,6 +52,12 @@ public final class MetadataStoreStats implements
AutoCloseable {
.labelNames(METADATA_STORE_LABEL_NAME)
.register();
+ public static final AttributeKey<String> METADATA_STORE_NAME =
AttributeKey.stringKey("pulsar.metadata.store.name");
+ public static final String METADATA_STORE_PUT_BYTES_COUNTER_METRIC_NAME =
+ "pulsar.broker.metadata.store.outgoing.size";
+ private final Attributes attributes;
+ private final LongCounter putBytesCounter;
+
private final Histogram.Child getOpsSucceedChild;
private final Histogram.Child delOpsSucceedChild;
private final Histogram.Child putOpsSucceedChild;
@@ -58,7 +68,7 @@ public final class MetadataStoreStats implements
AutoCloseable {
private final String metadataStoreName;
private final AtomicBoolean closed = new AtomicBoolean(false);
- public MetadataStoreStats(String metadataStoreName) {
+ public MetadataStoreStats(String metadataStoreName, OpenTelemetry
openTelemetry) {
this.metadataStoreName = metadataStoreName;
this.getOpsSucceedChild = OPS_LATENCY.labels(metadataStoreName,
OPS_TYPE_GET, STATUS_SUCCESS);
@@ -68,6 +78,13 @@ public final class MetadataStoreStats implements
AutoCloseable {
this.delOpsFailedChild = OPS_LATENCY.labels(metadataStoreName,
OPS_TYPE_DEL, STATUS_FAIL);
this.putOpsFailedChild = OPS_LATENCY.labels(metadataStoreName,
OPS_TYPE_PUT, STATUS_FAIL);
this.putBytesChild = PUT_BYTES.labels(metadataStoreName);
+
+ attributes = Attributes.of(METADATA_STORE_NAME, metadataStoreName);
+ putBytesCounter = openTelemetry.getMeter("org.apache.pulsar")
+ .counterBuilder(METADATA_STORE_PUT_BYTES_COUNTER_METRIC_NAME)
+ .setDescription("The total amount of data written to the
metadata store")
+ .setUnit("{By}")
+ .build();
}
public void recordGetOpsSucceeded(long millis) {
@@ -81,6 +98,7 @@ public final class MetadataStoreStats implements
AutoCloseable {
public void recordPutOpsSucceeded(long millis, int bytes) {
this.putOpsSucceedChild.observe(millis);
this.putBytesChild.inc(bytes);
+ this.putBytesCounter.add(bytes, attributes);
}
public void recordGetOpsFailed(long millis) {
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
index c0159be4303..6ede02b6713 100644
---
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.metadata.impl;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
+import io.opentelemetry.api.OpenTelemetry;
import lombok.Cleanup;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataStore;
@@ -91,7 +92,7 @@ public class MetadataStoreFactoryImplTest {
public static class MyMetadataStore extends AbstractMetadataStore {
protected MyMetadataStore() {
- super("custom");
+ super("custom", OpenTelemetry.noop());
}
@Override