This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 243ad5a22f7 [feat][broker] PIP-264: Add OpenTelemetry metadata store 
stats (#22952)
243ad5a22f7 is described below

commit 243ad5a22f7cc533e5a7382c01cb875dcdaee6bb
Author: Dragos Misca <[email protected]>
AuthorDate: Tue Jun 25 22:42:57 2024 -0700

    [feat][broker] PIP-264: Add OpenTelemetry metadata store stats (#22952)
---
 .../org/apache/pulsar/broker/PulsarService.java    | 15 +++-
 .../broker/stats/PulsarBrokerOpenTelemetry.java    |  2 +
 .../AntiAffinityNamespaceGroupTest.java            |  6 +-
 .../stats/OpenTelemetryMetadataStoreStatsTest.java | 94 ++++++++++++++++++++++
 .../testcontext/AbstractTestPulsarService.java     |  7 +-
 .../pulsar/metadata/api/MetadataStoreConfig.java   |  7 ++
 .../metadata/impl/AbstractMetadataStore.java       |  5 +-
 .../metadata/impl/LocalMemoryMetadataStore.java    |  2 +-
 .../pulsar/metadata/impl/RocksdbMetadataStore.java |  2 +-
 .../batching/AbstractBatchedMetadataStore.java     |  4 +-
 .../metadata/impl/oxia/OxiaMetadataStore.java      |  7 +-
 .../impl/stats/BatchMetadataStoreStats.java        | 23 +++++-
 .../metadata/impl/stats/MetadataStoreStats.java    | 20 ++++-
 .../impl/MetadataStoreFactoryImplTest.java         |  3 +-
 14 files changed, 175 insertions(+), 22 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 8cf1376642b..617afc6e5d1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -31,6 +31,7 @@ import io.netty.channel.socket.SocketChannel;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timer;
 import io.netty.util.concurrent.DefaultThreadFactory;
+import io.opentelemetry.api.OpenTelemetry;
 import 
io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
@@ -382,7 +383,8 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                 DEFAULT_MONOTONIC_CLOCK_GRANULARITY_MILLIS), System::nanoTime);
     }
 
-    public MetadataStore 
createConfigurationMetadataStore(PulsarMetadataEventSynchronizer synchronizer)
+    public MetadataStore 
createConfigurationMetadataStore(PulsarMetadataEventSynchronizer synchronizer,
+                                                          OpenTelemetry 
openTelemetry)
             throws MetadataStoreException {
         return 
MetadataStoreFactory.create(config.getConfigurationMetadataStoreUrl(),
                 MetadataStoreConfig.builder()
@@ -395,6 +397,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                         
.batchingMaxSizeKb(config.getMetadataStoreBatchingMaxSizeKb())
                         
.metadataStoreName(MetadataStoreConfig.CONFIGURATION_METADATA_STORE)
                         .synchronizer(synchronizer)
+                        .openTelemetry(openTelemetry)
                         .build());
     }
 
@@ -845,7 +848,8 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
             localMetadataSynchronizer = 
StringUtils.isNotBlank(config.getMetadataSyncEventTopic())
                     ? new PulsarMetadataEventSynchronizer(this, 
config.getMetadataSyncEventTopic())
                     : null;
-            localMetadataStore = 
createLocalMetadataStore(localMetadataSynchronizer);
+            localMetadataStore = 
createLocalMetadataStore(localMetadataSynchronizer,
+                    
openTelemetry.getOpenTelemetryService().getOpenTelemetry());
             
localMetadataStore.registerSessionListener(this::handleMetadataSessionEvent);
 
             coordinationService = new 
CoordinationServiceImpl(localMetadataStore);
@@ -854,7 +858,8 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                 configMetadataSynchronizer = 
StringUtils.isNotBlank(config.getConfigurationMetadataSyncEventTopic())
                         ? new PulsarMetadataEventSynchronizer(this, 
config.getConfigurationMetadataSyncEventTopic())
                         : null;
-                configurationMetadataStore = 
createConfigurationMetadataStore(configMetadataSynchronizer);
+                configurationMetadataStore = 
createConfigurationMetadataStore(configMetadataSynchronizer,
+                        
openTelemetry.getOpenTelemetryService().getOpenTelemetry());
                 shouldShutdownConfigurationMetadataStore = true;
             } else {
                 configurationMetadataStore = localMetadataStore;
@@ -1209,7 +1214,8 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
         }
     }
 
-    public MetadataStoreExtended 
createLocalMetadataStore(PulsarMetadataEventSynchronizer synchronizer)
+    public MetadataStoreExtended 
createLocalMetadataStore(PulsarMetadataEventSynchronizer synchronizer,
+                                                          OpenTelemetry 
openTelemetry)
             throws MetadataStoreException, PulsarServerException {
         return MetadataStoreExtended.create(config.getMetadataStoreUrl(),
                 MetadataStoreConfig.builder()
@@ -1222,6 +1228,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                         
.batchingMaxSizeKb(config.getMetadataStoreBatchingMaxSizeKb())
                         .synchronizer(synchronizer)
                         .metadataStoreName(MetadataStoreConfig.METADATA_STORE)
+                        .openTelemetry(openTelemetry)
                         .build());
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java
index 01ca65d2cc5..c1bcfadaf97 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java
@@ -31,6 +31,8 @@ import org.apache.pulsar.opentelemetry.OpenTelemetryService;
 public class PulsarBrokerOpenTelemetry implements Closeable {
 
     public static final String SERVICE_NAME = "pulsar-broker";
+
+    @Getter
     private final OpenTelemetryService openTelemetryService;
 
     @Getter
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
index 5fbda961c0e..fc2fec96294 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
@@ -26,6 +26,7 @@ import com.google.common.collect.BoundType;
 import com.google.common.collect.Range;
 import com.google.common.collect.Sets;
 import com.google.common.hash.Hashing;
+import io.opentelemetry.api.OpenTelemetry;
 import java.lang.reflect.Field;
 import java.util.Collections;
 import java.util.HashMap;
@@ -136,8 +137,9 @@ public class AntiAffinityNamespaceGroupTest extends 
MockedPulsarServiceBaseTest
 
     protected void beforePulsarStart(PulsarService pulsar) throws Exception {
         if (resources == null) {
-            MetadataStoreExtended localStore = 
pulsar.createLocalMetadataStore(null);
-            MetadataStoreExtended configStore = (MetadataStoreExtended) 
pulsar.createConfigurationMetadataStore(null);
+            MetadataStoreExtended localStore = 
pulsar.createLocalMetadataStore(null, OpenTelemetry.noop());
+            MetadataStoreExtended configStore =
+                    (MetadataStoreExtended) 
pulsar.createConfigurationMetadataStore(null, OpenTelemetry.noop());
             resources = new PulsarResources(localStore, configStore);
         }
         this.createNamespaceIfNotExists(resources, 
NamespaceName.SYSTEM_NAMESPACE.getTenant(),
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryMetadataStoreStatsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryMetadataStoreStatsTest.java
new file mode 100644
index 00000000000..15689fca5d7
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryMetadataStoreStatsTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats;
+
+import static 
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
+import static org.assertj.core.api.Assertions.assertThat;
+import io.opentelemetry.api.common.Attributes;
+import java.util.concurrent.ExecutorService;
+import lombok.Cleanup;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.testcontext.NonClosingProxyHandler;
+import org.apache.pulsar.broker.testcontext.PulsarTestContext;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.impl.stats.BatchMetadataStoreStats;
+import org.apache.pulsar.metadata.impl.stats.MetadataStoreStats;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class OpenTelemetryMetadataStoreStatsTest extends BrokerTestBase {
+
+    @BeforeMethod(alwaysRun = true)
+    @Override
+    protected void setup() throws Exception {
+        super.baseSetup();
+        setupDefaultTenantAndNamespace();
+
+        // In testing conditions, the metadata store gets initialized before 
Pulsar does, so the OpenTelemetry SDK is
+        // not yet initialized. Work around this issue by recreating the stats 
object once we have access to the SDK.
+        var localMetadataStore = (MetadataStore) 
NonClosingProxyHandler.getDelegate(pulsar.getLocalMetadataStore());
+        var currentStats = (MetadataStoreStats) 
FieldUtils.readField(localMetadataStore, "metadataStoreStats", true);
+        var localMetadataStoreName = (String) 
FieldUtils.readField(currentStats, "metadataStoreName", true);
+
+        currentStats.close();
+        var newStats = new MetadataStoreStats(
+                localMetadataStoreName, 
pulsar.getOpenTelemetry().getOpenTelemetryService().getOpenTelemetry());
+        FieldUtils.writeField(localMetadataStore, "metadataStoreStats", 
newStats, true);
+
+        var currentBatchedStats = (BatchMetadataStoreStats) 
FieldUtils.readField(localMetadataStore, "batchMetadataStoreStats", true);
+        currentBatchedStats.close();
+        var currentExecutor = (ExecutorService) 
FieldUtils.readField(currentBatchedStats, "executor", true);
+        var newBatchedStats = new BatchMetadataStoreStats(
+                localMetadataStoreName, currentExecutor, 
pulsar.getOpenTelemetry().getOpenTelemetryService().getOpenTelemetry());
+        FieldUtils.writeField(localMetadataStore, "batchMetadataStoreStats", 
newBatchedStats, true);
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Override
+    protected void 
customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder 
pulsarTestContextBuilder) {
+        super.customizeMainPulsarTestContextBuilder(pulsarTestContextBuilder);
+        pulsarTestContextBuilder.enableOpenTelemetry(true);
+    }
+
+    @Test
+    public void testMetadataStoreStats() throws Exception {
+        var topicName = 
BrokerTestUtil.newUniqueName("persistent://public/default/test-metadata-store-stats");
+
+        @Cleanup
+        var producer = pulsarClient.newProducer().topic(topicName).create();
+
+        producer.newMessage().value("test".getBytes()).send();
+
+        var attributes = Attributes.of(MetadataStoreStats.METADATA_STORE_NAME, 
"metadata-store");
+
+        var metrics = 
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+        assertMetricLongSumValue(metrics, 
MetadataStoreStats.METADATA_STORE_PUT_BYTES_COUNTER_METRIC_NAME,
+                attributes, value -> assertThat(value).isPositive());
+        assertMetricLongSumValue(metrics, 
BatchMetadataStoreStats.EXECUTOR_QUEUE_SIZE_METRIC_NAME, attributes,
+                value -> assertThat(value).isPositive());
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java
index c459098f685..c67714484f4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java
@@ -19,6 +19,7 @@
 
 package org.apache.pulsar.broker.testcontext;
 
+import io.opentelemetry.api.OpenTelemetry;
 import 
io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder;
 import java.io.IOException;
 import java.util.Optional;
@@ -68,7 +69,8 @@ abstract class AbstractTestPulsarService extends 
PulsarService {
     }
 
     @Override
-    public MetadataStore 
createConfigurationMetadataStore(PulsarMetadataEventSynchronizer synchronizer)
+    public MetadataStore 
createConfigurationMetadataStore(PulsarMetadataEventSynchronizer synchronizer,
+                                                          OpenTelemetry 
openTelemetry)
             throws MetadataStoreException {
         if (synchronizer != null) {
             synchronizer.registerSyncListener(
@@ -78,7 +80,8 @@ abstract class AbstractTestPulsarService extends 
PulsarService {
     }
 
     @Override
-    public MetadataStoreExtended 
createLocalMetadataStore(PulsarMetadataEventSynchronizer synchronizer)
+    public MetadataStoreExtended 
createLocalMetadataStore(PulsarMetadataEventSynchronizer synchronizer,
+                                                          OpenTelemetry 
openTelemetry)
             throws MetadataStoreException, PulsarServerException {
         if (synchronizer != null) {
             synchronizer.registerSyncListener(
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
index 5ddfe33c391..be29f843eea 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.metadata.api;
 
+import io.opentelemetry.api.OpenTelemetry;
 import lombok.Builder;
 import lombok.Getter;
 import lombok.ToString;
@@ -92,4 +93,10 @@ public class MetadataStoreConfig {
      * separate clusters.
      */
     private MetadataEventSynchronizer synchronizer;
+
+    /**
+     * OpenTelemetry instance to monitor metadata store operations.
+     */
+    @Builder.Default
+    private OpenTelemetry openTelemetry = OpenTelemetry.noop();
 }
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index 7315e6a04a2..f35f1974632 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -26,6 +26,7 @@ import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.LoadingCache;
 import com.google.common.annotations.VisibleForTesting;
 import io.netty.util.concurrent.DefaultThreadFactory;
+import io.opentelemetry.api.OpenTelemetry;
 import java.time.Instant;
 import java.util.Collections;
 import java.util.EnumSet;
@@ -88,7 +89,7 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
 
     protected abstract CompletableFuture<Boolean> existsFromStore(String path);
 
-    protected AbstractMetadataStore(String metadataStoreName) {
+    protected AbstractMetadataStore(String metadataStoreName, OpenTelemetry 
openTelemetry) {
         this.executor = new ScheduledThreadPoolExecutor(1,
                 new DefaultThreadFactory(
                         StringUtils.isNotBlank(metadataStoreName) ? 
metadataStoreName : getClass().getSimpleName()));
@@ -137,7 +138,7 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
                 });
 
         this.metadataStoreName = metadataStoreName;
-        this.metadataStoreStats = new MetadataStoreStats(metadataStoreName);
+        this.metadataStoreStats = new MetadataStoreStats(metadataStoreName, 
openTelemetry);
     }
 
     @Override
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
index 3909a89cf5e..e95f1947740 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
@@ -78,7 +78,7 @@ public class LocalMemoryMetadataStore extends 
AbstractMetadataStore implements M
 
     public LocalMemoryMetadataStore(String metadataURL, MetadataStoreConfig 
metadataStoreConfig)
             throws MetadataStoreException {
-        super(metadataStoreConfig.getMetadataStoreName());
+        super(metadataStoreConfig.getMetadataStoreName(), 
metadataStoreConfig.getOpenTelemetry());
         String name = metadataURL.substring(MEMORY_SCHEME_IDENTIFIER.length());
         // Local means a private data set
         // update synchronizer and register sync listener
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
index 06f7b260536..20e3c4c2b27 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
@@ -209,7 +209,7 @@ public class RocksdbMetadataStore extends 
AbstractMetadataStore {
      */
     private RocksdbMetadataStore(String metadataURL, MetadataStoreConfig 
metadataStoreConfig)
             throws MetadataStoreException {
-        super(metadataStoreConfig.getMetadataStoreName());
+        super(metadataStoreConfig.getMetadataStoreName(), 
metadataStoreConfig.getOpenTelemetry());
         this.metadataUrl = metadataURL;
         try {
             RocksDB.loadLibrary();
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
index 4fa1c6aca0f..4275920d7f9 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
@@ -56,7 +56,7 @@ public abstract class AbstractBatchedMetadataStore extends 
AbstractMetadataStore
     private final BatchMetadataStoreStats batchMetadataStoreStats;
 
     protected AbstractBatchedMetadataStore(MetadataStoreConfig conf) {
-        super(conf.getMetadataStoreName());
+        super(conf.getMetadataStoreName(), conf.getOpenTelemetry());
 
         this.enabled = conf.isBatchingEnabled();
         this.maxDelayMillis = conf.getBatchingMaxDelayMillis();
@@ -77,7 +77,7 @@ public abstract class AbstractBatchedMetadataStore extends 
AbstractMetadataStore
         // update synchronizer and register sync listener
         updateMetadataEventSynchronizer(conf.getSynchronizer());
         this.batchMetadataStoreStats =
-                new BatchMetadataStoreStats(metadataStoreName, executor);
+                new BatchMetadataStoreStats(metadataStoreName, executor, 
conf.getOpenTelemetry());
     }
 
     @Override
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
index 154a0ec0c4f..e9da7ec7c1a 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.metadata.impl.oxia;
 
+import io.opentelemetry.api.OpenTelemetry;
 import io.streamnative.oxia.client.api.AsyncOxiaClient;
 import io.streamnative.oxia.client.api.DeleteOption;
 import io.streamnative.oxia.client.api.Notification;
@@ -58,7 +59,7 @@ public class OxiaMetadataStore extends AbstractMetadataStore {
     private Optional<MetadataEventSynchronizer> synchronizer;
 
     public OxiaMetadataStore(AsyncOxiaClient oxia, String identity) {
-        super("oxia-metadata");
+        super("oxia-metadata", OpenTelemetry.noop());
         this.client = oxia;
         this.identity = identity;
         this.synchronizer = Optional.empty();
@@ -68,10 +69,10 @@ public class OxiaMetadataStore extends 
AbstractMetadataStore {
     public OxiaMetadataStore(
             @NonNull String serviceAddress,
             @NonNull String namespace,
-            @NonNull MetadataStoreConfig metadataStoreConfig,
+            MetadataStoreConfig metadataStoreConfig,
             boolean enableSessionWatcher)
             throws Exception {
-        super("oxia-metadata");
+        super("oxia-metadata", 
Objects.requireNonNull(metadataStoreConfig).getOpenTelemetry());
 
         var linger = metadataStoreConfig.getBatchingMaxDelayMillis();
         if (!metadataStoreConfig.isBatchingEnabled()) {
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/BatchMetadataStoreStats.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/BatchMetadataStoreStats.java
index f87155b9259..9549a8df8f9 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/BatchMetadataStoreStats.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/BatchMetadataStoreStats.java
@@ -18,6 +18,9 @@
  */
 package org.apache.pulsar.metadata.impl.stats;
 
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.ObservableLongUpDownCounter;
 import io.prometheus.client.Gauge;
 import io.prometheus.client.Histogram;
 import java.util.concurrent.ExecutorService;
@@ -58,7 +61,10 @@ public final class BatchMetadataStoreStats implements 
AutoCloseable {
     private final Histogram.Child batchExecuteTimeChild;
     private final Histogram.Child opsPerBatchChild;
 
-    public BatchMetadataStoreStats(String metadataStoreName, ExecutorService 
executor) {
+    public static final String EXECUTOR_QUEUE_SIZE_METRIC_NAME = 
"pulsar.broker.metadata.store.executor.queue.size";
+    private final ObservableLongUpDownCounter batchMetadataStoreSizeCounter;
+
+    public BatchMetadataStoreStats(String metadataStoreName, ExecutorService 
executor, OpenTelemetry openTelemetry) {
         if (executor instanceof ThreadPoolExecutor tx) {
             this.executor = tx;
         } else {
@@ -69,8 +75,7 @@ public final class BatchMetadataStoreStats implements 
AutoCloseable {
         EXECUTOR_QUEUE_SIZE.setChild(new Gauge.Child() {
             @Override
             public double get() {
-                return BatchMetadataStoreStats.this.executor == null ? 0 :
-                        
BatchMetadataStoreStats.this.executor.getQueue().size();
+                return getQueueSize();
             }
         }, metadataStoreName);
 
@@ -78,6 +83,17 @@ public final class BatchMetadataStoreStats implements 
AutoCloseable {
         this.batchExecuteTimeChild = 
BATCH_EXECUTE_TIME.labels(metadataStoreName);
         this.opsPerBatchChild = OPS_PER_BATCH.labels(metadataStoreName);
 
+        var meter = openTelemetry.getMeter("org.apache.pulsar");
+        var attributes = Attributes.of(MetadataStoreStats.METADATA_STORE_NAME, 
metadataStoreName);
+        this.batchMetadataStoreSizeCounter = meter
+                .upDownCounterBuilder(EXECUTOR_QUEUE_SIZE_METRIC_NAME)
+                .setDescription("The number of batch operations in the 
metadata store executor queue")
+                .setUnit("{operation}")
+                .buildWithCallback(measurement -> 
measurement.record(getQueueSize(), attributes));
+    }
+
+    private int getQueueSize() {
+        return executor == null ? 0 : executor.getQueue().size();
     }
 
     public void recordOpWaiting(long millis) {
@@ -99,6 +115,7 @@ public final class BatchMetadataStoreStats implements 
AutoCloseable {
             OPS_WAITING.remove(this.metadataStoreName);
             BATCH_EXECUTE_TIME.remove(this.metadataStoreName);
             OPS_PER_BATCH.remove(metadataStoreName);
+            batchMetadataStoreSizeCounter.close();
         }
     }
 }
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/MetadataStoreStats.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/MetadataStoreStats.java
index 45024a68383..5f0383f9520 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/MetadataStoreStats.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/MetadataStoreStats.java
@@ -18,6 +18,10 @@
  */
 package org.apache.pulsar.metadata.impl.stats;
 
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.LongCounter;
 import io.prometheus.client.Counter;
 import io.prometheus.client.Histogram;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -48,6 +52,12 @@ public final class MetadataStoreStats implements 
AutoCloseable {
             .labelNames(METADATA_STORE_LABEL_NAME)
             .register();
 
+    public static final AttributeKey<String> METADATA_STORE_NAME = 
AttributeKey.stringKey("pulsar.metadata.store.name");
+    public static final String METADATA_STORE_PUT_BYTES_COUNTER_METRIC_NAME =
+            "pulsar.broker.metadata.store.outgoing.size";
+    private final Attributes attributes;
+    private final LongCounter putBytesCounter;
+
     private final Histogram.Child getOpsSucceedChild;
     private final Histogram.Child delOpsSucceedChild;
     private final Histogram.Child putOpsSucceedChild;
@@ -58,7 +68,7 @@ public final class MetadataStoreStats implements 
AutoCloseable {
     private final String metadataStoreName;
     private final AtomicBoolean closed = new AtomicBoolean(false);
 
-    public MetadataStoreStats(String metadataStoreName) {
+    public MetadataStoreStats(String metadataStoreName, OpenTelemetry 
openTelemetry) {
         this.metadataStoreName = metadataStoreName;
 
         this.getOpsSucceedChild = OPS_LATENCY.labels(metadataStoreName, 
OPS_TYPE_GET, STATUS_SUCCESS);
@@ -68,6 +78,13 @@ public final class MetadataStoreStats implements 
AutoCloseable {
         this.delOpsFailedChild = OPS_LATENCY.labels(metadataStoreName, 
OPS_TYPE_DEL, STATUS_FAIL);
         this.putOpsFailedChild = OPS_LATENCY.labels(metadataStoreName, 
OPS_TYPE_PUT, STATUS_FAIL);
         this.putBytesChild = PUT_BYTES.labels(metadataStoreName);
+
+        attributes = Attributes.of(METADATA_STORE_NAME, metadataStoreName);
+        putBytesCounter = openTelemetry.getMeter("org.apache.pulsar")
+                .counterBuilder(METADATA_STORE_PUT_BYTES_COUNTER_METRIC_NAME)
+                .setDescription("The total amount of data written to the 
metadata store")
+                .setUnit("{By}")
+                .build();
     }
 
     public void recordGetOpsSucceeded(long millis) {
@@ -81,6 +98,7 @@ public final class MetadataStoreStats implements 
AutoCloseable {
     public void recordPutOpsSucceeded(long millis, int bytes) {
         this.putOpsSucceedChild.observe(millis);
         this.putBytesChild.inc(bytes);
+        this.putBytesCounter.add(bytes, attributes);
     }
 
     public void recordGetOpsFailed(long millis) {
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
index c0159be4303..6ede02b6713 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.metadata.impl;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
+import io.opentelemetry.api.OpenTelemetry;
 import lombok.Cleanup;
 import org.apache.pulsar.metadata.api.GetResult;
 import org.apache.pulsar.metadata.api.MetadataStore;
@@ -91,7 +92,7 @@ public class MetadataStoreFactoryImplTest {
 
     public static class MyMetadataStore extends AbstractMetadataStore {
         protected MyMetadataStore() {
-            super("custom");
+            super("custom", OpenTelemetry.noop());
         }
 
         @Override

Reply via email to