This is an automated email from the ASF dual-hosted git repository.

zhaocong pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 56cd23c301d Revert "[improve][meta] PIP-453: Improve the metadata 
store threading model (#25187)"
56cd23c301d is described below

commit 56cd23c301dd1a9ae6249c51439da5e142037185
Author: coderzc <[email protected]>
AuthorDate: Fri Feb 13 10:49:58 2026 +0800

    Revert "[improve][meta] PIP-453: Improve the metadata store threading model 
(#25187)"
    
    This reverts commit 786864c04d772f36f6da9363406173af6c5ad1f2.
---
 conf/broker.conf                                   |   2 -
 conf/standalone.conf                               |   3 -
 pip/pip-453.md                                     |  82 ---------
 .../IsolatedBookieEnsemblePlacementPolicy.java     |   3 -
 .../apache/pulsar/broker/ServiceConfiguration.java |   6 -
 .../IsolatedBookieEnsemblePlacementPolicyTest.java |  34 ++--
 .../org/apache/pulsar/broker/PulsarService.java    |   2 -
 .../apache/pulsar/broker/PulsarServiceTest.java    |  65 --------
 .../stats/OpenTelemetryMetadataStoreStatsTest.java |  12 ++
 .../pulsar/metadata/api/MetadataStoreConfig.java   |   3 -
 .../metadata/cache/impl/MetadataCacheImpl.java     | 185 +++++++++------------
 .../metadata/impl/AbstractMetadataStore.java       |  77 ++++-----
 .../pulsar/metadata/impl/EtcdMetadataStore.java    |  19 ++-
 .../metadata/impl/LocalMemoryMetadataStore.java    |   2 +-
 .../pulsar/metadata/impl/RocksdbMetadataStore.java |   2 +-
 .../pulsar/metadata/impl/ZKMetadataStore.java      |  19 ++-
 .../batching/AbstractBatchedMetadataStore.java     |  39 ++---
 .../metadata/impl/oxia/OxiaMetadataStore.java      |   4 +-
 .../impl/stats/BatchMetadataStoreStats.java        |  43 ++++-
 .../impl/MetadataStoreFactoryImplTest.java         |   2 +-
 20 files changed, 211 insertions(+), 393 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index b2e74569f14..8b6ec12d7f2 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -954,8 +954,6 @@ metadataStoreBatchingMaxOperations=1000
 # Maximum size of a batch
 metadataStoreBatchingMaxSizeKb=128
 
-# The number of threads used for serializing and deserializing data to and 
from the metadata store
-metadataStoreSerDesThreads=1
 
 ### --- Authentication --- ###
 
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 571cc0fbbe8..e5adf9a9637 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -430,9 +430,6 @@ metadataStoreBatchingMaxOperations=1000
 # Maximum size of a batch
 metadataStoreBatchingMaxSizeKb=128
 
-# The number of threads used for serializing and deserializing data to and 
from the metadata store
-metadataStoreSerDesThreads=1
-
 ### --- TLS --- ###
 # Deprecated - Use webServicePortTls and brokerServicePortTls instead
 tlsEnabled=false
diff --git a/pip/pip-453.md b/pip/pip-453.md
deleted file mode 100644
index f9109798ba7..00000000000
--- a/pip/pip-453.md
+++ /dev/null
@@ -1,82 +0,0 @@
-# PIP-453: Improve the metadata store threading model
-
-# Background knowledge
-
-The `pulsar-metadata` module provides two abstractions for interacting with 
metadata stores:
-- `MetadataStore`: the wrapper on the actual underlying metadata store (e.g. 
ZooKeeper), which has caches for value and children of a given key.
-- `MetadataCache<T>`: a typed cache layer on top of `MetadataStore`, which 
performs serialization and deserialization of data between `T` and `byte[]`.
-
-The `MetadataStore` instance is unique in each broker, and is shared by 
multiple `MetadataCache<T>` instances.
-
-However, a single thread whose name starts with the metadata store name (e.g. 
`ZK-MetadataStore`) is used by implementations of them. This thread is used in 
the following tasks:
-1. Executing callbacks of APIs like `put`.
-2. Executing notification handlers, including `AbstractMetadataStore#accept`, 
which calls `accept` methods of all `MetadataCache` instances and all listeners 
registered by `MetadataStore#registerListener`.
-3. For ZooKeeper and Etcd, which support batching requests, it's used to 
schedule flushing tasks at a fixed rate, which is determined by the 
`metadataStoreBatchingMaxDelayMillis` config (default: 5 ms).
-4. Scheduling some other tasks, e.g. retrying failed operations.
-
-It should be noted that `MetadataCache` executes the compute sensitive tasks 
like serialization in the `MetadataStore` callback. When the number of metadata 
operations grows, this thread is easy to be overwhelmed. It also affects the 
topic loading, which involves many metadata operations, this thread can be 
overwhelmed and block other tasks. For example, in a production environment, 
it's observed that the `pulsar_batch_metadata_store_queue_wait_time` metric is 
high (100 ms), which should [...]
-
-# Motivation
-
-The single thread model is inefficient when there are many metadata 
operations. For example, when a broker is down and the topics owned by this 
broker will be transferred to the new owner broker. Since the new owner broker 
might never owned them before, even the `MetadataCache` caches are cold, which 
results in many metadata operations. However, the CPU-bound tasks like 
serialization and deserialization are executed in the `MetadataStore` thread, 
which makes it easy to be overwhelmed. Th [...]
-
-In a production environment, there is a case when the metadata operation rate 
increased suddenly, the `pulsar_batch_metadata_store_queue_wait_time_ms_bucket` 
metric increased to ~100 ms, which is a part of the total latency of a metadata 
operation. As a result, the total P99 get latency 
(`pulsar_metadata_store_ops_latency_ms_bucket{type="get"}`) increased to 2 
seconds.
-
-The 3rd task in the previous section is scheduled via `scheduleAtFixedRate`, 
which means if the task is not executed in time (5 ms by default), the task 
will be executed immediately again in a short time, which also burdens the 
single metadata store thread.
-
-# Goals
-
-## In Scope
-
-Improve the existing thread model to handle various tasks on metadata store, 
which could avoid a single thread being overwhelmed when there are many 
metadata operations.
-
-## Out of Scope
-
-Actually the batching mechanism introduced by 
[#13043](https://github.com/apache/pulsar/pull/13043) is harmful. The `flush` 
method, which is responsible to send a batch of metadata operations to broker, 
is called in the metadata store thread rather than the caller thread. The 
trade-off of the higher throughput is the lower latency. The benefit is limited 
because in most time the metadata operation rate is not so high. See this [test 
report](https://github.com/BewareMyPower/zookeeper-benc [...]
-
-This proposal doesn't intend to change the existing batching mechanism or 
disable it by default. It only improves the threading model to avoid the single 
thread being overwhelmed.
-
-Additionally, some code paths execute the compute intensive tasks in the 
metadata store thread directly (e.g. `store.get(path).thenApply(/* ... */)`), 
this proposal does not aim at changing them to asynchronous methods (e.g. 
`thenApplyAsync`).
-
-# High Level Design
-
-Create 4 sets of threads:
-- `<name>-event`: the original metadata store thread, which is now only 
responsible to handle notifications. This executor won't be a 
`ScheduledExecutorService` anymore.
-- `<name>-scheduler`: a single thread, which is used to schedule tasks like 
flushing and retrying failed operations.
-- `<name>-batch-flusher`: a single thread, which is used to schedule the 
flushing task at a fixed rate. It won't be created if 
`metadataStoreBatchingEnabled` is false.
-- `<name>-worker`: a fixed thread pool shared by all `MetadataCache` instances 
to execute compute intensive tasks like serialization and deserialization. The 
same path will be handled by the same thread to keep the processing order on 
the same path.
-
-Regarding the callbacks, don't switch to a different thread. This change is 
not breaking because the underlying metadata store usually executes the 
callback in a single thread (e.g. `<name>-EventThread` in ZooKeeper) like the 
single thread in the current implementation. The caller should be responsible 
to manage worker threads on the metadata operation result if the callback is 
compute intensive.
-
-The only concern is that introducing a new thread to execute callbacks allows 
waiting for the future of metadata store APIs in the callback. After this 
change, the following use case could be a dead lock:
-
-```java
-metadataStore.get(path).thenApply(__ -> metadataStore.get(otherPath).join());;
-```
-
-# Detailed Design
-
-## Public-facing Changes
-
-### Configuration
-
-Add a configurations to specify the number of worker threads for 
`MetadataCache`:
-
-```java
-    @FieldContext(
-            category = CATEGORY_SERVER,
-            doc = "The number of threads uses for serializing and 
deserializing data to and from the metadata store"
-    )
-    private int metadataStoreSerDesThreads = 1;
-```
-
-Use 1 as the default value since the serialization and deserialization tasks 
are not frequent. This separated thread pool is mainly added to avoid blocking 
the metadata store callback thread.
-
-### Metrics
-
-The `pulsar_batch_metadata_store_executor_queue_size` metric will be removed 
because the `<name>-batch-flusher` thread won't execute other tasks except for 
flushing.
-
-# Links
-
-* Mailing List discussion thread: 
https://lists.apache.org/thread/0cfdyvj96gw1sp1mo2zghl0lmsms5w1d
-* Mailing List voting thread: 
https://lists.apache.org/thread/cktj2k8myw076yggn63k8yxs5357yd61
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
index 4ef1c594be4..878bbc4d654 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
@@ -28,7 +28,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
 import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
@@ -58,8 +57,6 @@ public class IsolatedBookieEnsemblePlacementPolicy extends 
RackawareEnsemblePlac
     // the secondary group.
     private ImmutablePair<Set<String>, Set<String>> defaultIsolationGroups;
 
-    @Getter
-    @VisibleForTesting
     private MetadataCache<BookiesRackConfiguration> bookieMappingCache;
 
     private static final String PULSAR_SYSTEM_TOPIC_ISOLATION_GROUP = "*";
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 93670bb2377..44b62709fab 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -498,12 +498,6 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     )
     private boolean metadataStoreAllowReadOnlyOperations;
 
-    @FieldContext(
-            category = CATEGORY_SERVER,
-            doc = "The number of threads used for serializing and 
deserializing data to and from the metadata store"
-    )
-    private int metadataStoreSerDesThreads = 1;
-
     @Deprecated
     @FieldContext(
         category = CATEGORY_SERVER,
diff --git 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
index 936b04386ff..68f92ab416d 100644
--- 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
+++ 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
@@ -22,15 +22,12 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNotEquals;
-import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Sets;
 import io.netty.util.HashedWheelTimer;
 import java.nio.charset.StandardCharsets;
-import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -291,7 +288,8 @@ public class IsolatedBookieEnsemblePlacementPolicyTest {
         secondaryBookieGroup.put(BOOKIE4, 
BookieInfo.builder().rack("rack0").build());
         bookieMapping.put("group2", secondaryBookieGroup);
 
-        updateBookieInfo(isolationPolicy, 
jsonMapper.writeValueAsBytes(bookieMapping));
+        store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, 
jsonMapper.writeValueAsBytes(bookieMapping),
+                Optional.empty()).join();
 
         ensemble = isolationPolicy.newEnsemble(2, 2, 2, Collections.emptyMap(),
                 null).getResult();
@@ -342,7 +340,8 @@ public class IsolatedBookieEnsemblePlacementPolicyTest {
                 + "\": {\"rack\": \"rack0\", \"hostname\": 
\"bookie3.example.com\"}, \"" + BOOKIE4
                 + "\": {\"rack\": \"rack2\", \"hostname\": 
\"bookie4.example.com\"}}}";
 
-        updateBookieInfo(isolationPolicy, 
data.getBytes(StandardCharsets.UTF_8));
+        store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, 
data.getBytes(StandardCharsets.UTF_8),
+                Optional.empty()).join();
 
         List<BookieId> ensemble = isolationPolicy.newEnsemble(2, 2, 2, 
Collections.emptyMap(),
                 new HashSet<>()).getResult();
@@ -400,7 +399,8 @@ public class IsolatedBookieEnsemblePlacementPolicyTest {
         bookieMapping.put("group1", mainBookieGroup);
         bookieMapping.put("group2", secondaryBookieGroup);
 
-        updateBookieInfo(isolationPolicy, 
jsonMapper.writeValueAsBytes(bookieMapping));
+        store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, 
jsonMapper.writeValueAsBytes(bookieMapping),
+                Optional.empty()).join();
 
         ensemble = isolationPolicy.newEnsemble(3, 3, 3, Collections.emptyMap(),
                 new HashSet<>()).getResult();
@@ -784,7 +784,8 @@ public class IsolatedBookieEnsemblePlacementPolicyTest {
         bookieMapping.put(isolationGroup2, group2);
         bookieMapping.put(isolationGroup3, group3);
 
-        updateBookieInfo(isolationPolicy, 
jsonMapper.writeValueAsBytes(bookieMapping));
+        store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, 
jsonMapper.writeValueAsBytes(bookieMapping),
+                Optional.empty()).join();
 
         groups.setLeft(Sets.newHashSet(isolationGroup1, isolationGroup3));
         groups.setRight(Sets.newHashSet(""));
@@ -807,7 +808,8 @@ public class IsolatedBookieEnsemblePlacementPolicyTest {
         bookieMapping.put(isolationGroup1, group1);
         bookieMapping.put(isolationGroup2, group2);
 
-        updateBookieInfo(isolationPolicy, 
jsonMapper.writeValueAsBytes(bookieMapping));
+        store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, 
jsonMapper.writeValueAsBytes(bookieMapping),
+                Optional.empty()).join();
 
         groups.setLeft(Sets.newHashSet(isolationGroup1));
         groups.setRight(Sets.newHashSet(isolationGroup2));
@@ -829,24 +831,12 @@ public class IsolatedBookieEnsemblePlacementPolicyTest {
         bookieMapping.put(isolationGroup1, group1);
         bookieMapping.put(isolationGroup2, group2);
 
-        updateBookieInfo(isolationPolicy, 
jsonMapper.writeValueAsBytes(bookieMapping));
+        store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, 
jsonMapper.writeValueAsBytes(bookieMapping),
+                Optional.empty()).join();
 
         groups.setLeft(Sets.newHashSet(isolationGroup1));
         groups.setRight(Sets.newHashSet(isolationGroup2));
         blacklist = isolationPolicy.getExcludedBookiesWithIsolationGroups(2, 
groups);
         assertTrue(blacklist.isEmpty());
     }
-
-    // The policy gets the bookie info asynchronously before each query or 
update, when putting the bookie info into
-    // the metadata store, the cache needs some time to receive the 
notification and update accordingly.
-    private void updateBookieInfo(IsolatedBookieEnsemblePlacementPolicy 
isolationPolicy, byte[] bookieInfo) {
-        final var cache = isolationPolicy.getBookieMappingCache();
-        assertNotNull(cache); // the policy must have been initialized
-
-        final var key = BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH;
-        final var previousBookieInfo = cache.getIfCached(key);
-        store.put(key, bookieInfo, Optional.empty()).join();
-        Awaitility.await().atMost(Duration.ofSeconds(1)).untilAsserted(() ->
-                assertNotEquals(cache.getIfCached(key), previousBookieInfo));
-    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index eaebd7fa58c..e61fbfac566 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -434,7 +434,6 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                         .synchronizer(synchronizer)
                         .openTelemetry(openTelemetry)
                         .nodeSizeStats(new DefaultMetadataNodeSizeStats())
-                        
.numSerDesThreads(config.getMetadataStoreSerDesThreads())
                         .build());
     }
 
@@ -1306,7 +1305,6 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                         .metadataStoreName(MetadataStoreConfig.METADATA_STORE)
                         .openTelemetry(openTelemetry)
                         .nodeSizeStats(new DefaultMetadataNodeSizeStats())
-                        
.numSerDesThreads(config.getMetadataStoreSerDesThreads())
                         .build());
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
index 6195e9cdae5..6c04889d8f1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
@@ -25,15 +25,10 @@ import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import static org.testng.AssertJUnit.assertSame;
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
-import java.util.function.BiConsumer;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -43,10 +38,6 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.apache.pulsar.functions.worker.WorkerService;
-import org.apache.pulsar.metadata.api.MetadataCacheConfig;
-import org.apache.pulsar.metadata.api.MetadataSerde;
-import org.apache.pulsar.metadata.api.MetadataStore;
-import org.apache.pulsar.metadata.api.Stat;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.Test;
 
@@ -348,60 +339,4 @@ public class PulsarServiceTest extends 
MockedPulsarServiceBaseTest {
             assertTrue(e instanceof PulsarClientException.TimeoutException);
         }
     }
-
-    @Test
-    public void testMetadataSerDesThreads() throws Exception {
-        final var numSerDesThreads = 5;
-        final var config = new ServiceConfiguration();
-        config.setMetadataStoreSerDesThreads(numSerDesThreads);
-        config.setClusterName("test");
-        config.setMetadataStoreUrl("memory:local");
-        config.setConfigurationMetadataStoreUrl("memory:local");
-
-        @Cleanup final var pulsar = new PulsarService(config);
-        pulsar.start();
-
-        BiConsumer<MetadataStore, String> verifier = (store, prefix) -> {
-            final var serDes = new CustomMetadataSerDes();
-            final var cache = store.getMetadataCache(prefix, serDes, 
MetadataCacheConfig.builder().build());
-            for (int i = 0; i < 100 && 
serDes.threadNameToSerializedPaths.size() < numSerDesThreads; i++) {
-                cache.create(prefix + i, "value-" + i).join();
-                final var value = cache.get(prefix + i).join();
-                assertEquals(value.orElseThrow(), "value-" + i);
-                final var newValue = cache.readModifyUpdate(prefix + i, s -> s 
+ "-updated").join();
-                assertEquals(newValue, "value-" + i + "-updated");
-                // Verify the serialization and deserialization are handled by 
the same thread
-                assertEquals(serDes.threadNameToSerializedPaths, 
serDes.threadNameToDeserializedPaths);
-            }
-            log.info("SerDes thread mapping: {}", 
serDes.threadNameToSerializedPaths);
-            assertEquals(serDes.threadNameToSerializedPaths.keySet().size(), 
numSerDesThreads);
-            // Verify a path cannot be handled by multiple threads
-            final var paths = 
serDes.threadNameToSerializedPaths.values().stream()
-                .flatMap(Set::stream).sorted().toList();
-            assertEquals(paths.stream().distinct().toList(), paths);
-        };
-
-        verifier.accept(pulsar.getLocalMetadataStore(), "/test-local/");
-        verifier.accept(pulsar.getConfigurationMetadataStore(), 
"/test-config/");
-    }
-
-    private static class CustomMetadataSerDes implements MetadataSerde<String> 
{
-
-        final Map<String, Set<String>> threadNameToSerializedPaths = new 
ConcurrentHashMap<>();
-        final Map<String, Set<String>> threadNameToDeserializedPaths = new 
ConcurrentHashMap<>();
-
-        @Override
-        public byte[] serialize(String path, String value) throws IOException{
-            
threadNameToSerializedPaths.computeIfAbsent(Thread.currentThread().getName(),
-                    __ -> ConcurrentHashMap.newKeySet()).add(path);
-            return value.getBytes();
-        }
-
-        @Override
-        public String deserialize(String path, byte[] data, Stat stat) throws 
IOException {
-            
threadNameToDeserializedPaths.computeIfAbsent(Thread.currentThread().getName(),
-                    __ -> ConcurrentHashMap.newKeySet()).add(path);
-            return new String(data);
-        }
-    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryMetadataStoreStatsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryMetadataStoreStatsTest.java
index 390aa1e49e2..9e8bde20b88 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryMetadataStoreStatsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryMetadataStoreStatsTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.stats;
 import static 
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
 import static org.assertj.core.api.Assertions.assertThat;
 import io.opentelemetry.api.common.Attributes;
+import java.util.concurrent.ExecutorService;
 import lombok.Cleanup;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.pulsar.broker.BrokerTestUtil;
@@ -28,6 +29,7 @@ import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.broker.testcontext.NonClosingProxyHandler;
 import org.apache.pulsar.broker.testcontext.PulsarTestContext;
 import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.impl.stats.BatchMetadataStoreStats;
 import org.apache.pulsar.metadata.impl.stats.MetadataStoreStats;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -51,6 +53,14 @@ public class OpenTelemetryMetadataStoreStatsTest extends 
BrokerTestBase {
         var newStats = new MetadataStoreStats(
                 localMetadataStoreName, 
pulsar.getOpenTelemetry().getOpenTelemetryService().getOpenTelemetry());
         FieldUtils.writeField(localMetadataStore, "metadataStoreStats", 
newStats, true);
+
+        var currentBatchedStats = (BatchMetadataStoreStats) 
FieldUtils.readField(localMetadataStore,
+                "batchMetadataStoreStats", true);
+        currentBatchedStats.close();
+        var currentExecutor = (ExecutorService) 
FieldUtils.readField(currentBatchedStats, "executor", true);
+        var newBatchedStats = new 
BatchMetadataStoreStats(localMetadataStoreName, currentExecutor,
+                
pulsar.getOpenTelemetry().getOpenTelemetryService().getOpenTelemetry());
+        FieldUtils.writeField(localMetadataStore, "batchMetadataStoreStats", 
newBatchedStats, true);
     }
 
     @AfterMethod(alwaysRun = true)
@@ -79,5 +89,7 @@ public class OpenTelemetryMetadataStoreStatsTest extends 
BrokerTestBase {
         var metrics = 
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
         assertMetricLongSumValue(metrics, 
MetadataStoreStats.METADATA_STORE_PUT_BYTES_COUNTER_METRIC_NAME,
                 attributes, value -> assertThat(value).isPositive());
+        assertMetricLongSumValue(metrics, 
BatchMetadataStoreStats.EXECUTOR_QUEUE_SIZE_METRIC_NAME, attributes,
+                value -> assertThat(value).isPositive());
     }
 }
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
index fcde0dce840..ef50dc87691 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
@@ -104,7 +104,4 @@ public class MetadataStoreConfig {
      * The estimator to estimate the payload length of metadata node, which 
used to limit the batch size requested.
      */
     private MetadataNodeSizeStats nodeSizeStats;
-
-    @Builder.Default
-    private final int numSerDesThreads = 1;
 }
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
index ca165f0464e..b1f0572547c 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
@@ -39,10 +39,10 @@ import java.util.function.Supplier;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
-import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.pulsar.common.stats.CacheMetricsCollector;
 import org.apache.pulsar.common.util.Backoff;
 import org.apache.pulsar.metadata.api.CacheGetResult;
+import org.apache.pulsar.metadata.api.GetResult;
 import org.apache.pulsar.metadata.api.MetadataCache;
 import org.apache.pulsar.metadata.api.MetadataCacheConfig;
 import org.apache.pulsar.metadata.api.MetadataSerde;
@@ -62,26 +62,23 @@ public class MetadataCacheImpl<T> implements 
MetadataCache<T>, Consumer<Notifica
     private final MetadataStore store;
     private final MetadataStoreExtended storeExtended;
     private final MetadataSerde<T> serde;
-    private final OrderedExecutor executor;
-    private final ScheduledExecutorService schedulerExecutor;
+    private final ScheduledExecutorService executor;
     private final MetadataCacheConfig<T> cacheConfig;
 
     private final AsyncLoadingCache<String, Optional<CacheGetResult<T>>> 
objCache;
 
     public MetadataCacheImpl(String cacheName, MetadataStore store, 
TypeReference<T> typeRef,
-                             MetadataCacheConfig<T> cacheConfig, 
OrderedExecutor executor,
-                             ScheduledExecutorService schedulerExecutor) {
-        this(cacheName, store, new JSONMetadataSerdeTypeRef<>(typeRef), 
cacheConfig, executor, schedulerExecutor);
+                             MetadataCacheConfig<T> cacheConfig, 
ScheduledExecutorService executor) {
+        this(cacheName, store, new JSONMetadataSerdeTypeRef<>(typeRef), 
cacheConfig, executor);
     }
 
     public MetadataCacheImpl(String cacheName, MetadataStore store, JavaType 
type, MetadataCacheConfig<T> cacheConfig,
-                             OrderedExecutor executor, 
ScheduledExecutorService schedulerExecutor) {
-        this(cacheName, store, new JSONMetadataSerdeSimpleType<>(type), 
cacheConfig, executor, schedulerExecutor);
+                             ScheduledExecutorService executor) {
+        this(cacheName, store, new JSONMetadataSerdeSimpleType<>(type), 
cacheConfig, executor);
     }
 
     public MetadataCacheImpl(String cacheName, MetadataStore store, 
MetadataSerde<T> serde,
-                             MetadataCacheConfig<T> cacheConfig, 
OrderedExecutor executor,
-                             ScheduledExecutorService schedulerExecutor) {
+                             MetadataCacheConfig<T> cacheConfig, 
ScheduledExecutorService executor) {
         this.store = store;
         if (store instanceof MetadataStoreExtended) {
             this.storeExtended = (MetadataStoreExtended) store;
@@ -91,7 +88,6 @@ public class MetadataCacheImpl<T> implements 
MetadataCache<T>, Consumer<Notifica
         this.serde = serde;
         this.cacheConfig = cacheConfig;
         this.executor = executor;
-        this.schedulerExecutor = schedulerExecutor;
 
         Caffeine<Object, Object> cacheBuilder = Caffeine.newBuilder();
         if (cacheConfig.getRefreshAfterWriteMillis() > 0) {
@@ -105,9 +101,6 @@ public class MetadataCacheImpl<T> implements 
MetadataCache<T>, Consumer<Notifica
                 .buildAsync(new AsyncCacheLoader<String, 
Optional<CacheGetResult<T>>>() {
                     @Override
                     public CompletableFuture<Optional<CacheGetResult<T>>> 
asyncLoad(String key, Executor executor) {
-                        if (log.isDebugEnabled()) {
-                            log.debug("Loading key {} into metadata cache {}", 
key, cacheName);
-                        }
                         return readValueFromStore(key);
                     }
 
@@ -117,16 +110,12 @@ public class MetadataCacheImpl<T> implements 
MetadataCache<T>, Consumer<Notifica
                             Optional<CacheGetResult<T>> oldValue,
                             Executor executor) {
                         if (store instanceof AbstractMetadataStore && 
((AbstractMetadataStore) store).isConnected()) {
-                            if (log.isDebugEnabled()) {
-                                log.debug("Reloading key {} into metadata 
cache {}", key, cacheName);
-                            }
-                            final var future = readValueFromStore(key);
-                            future.thenAccept(val -> {
+                            return readValueFromStore(key).thenApply(val -> {
                                 if (cacheConfig.getAsyncReloadConsumer() != 
null) {
                                     
cacheConfig.getAsyncReloadConsumer().accept(key, val);
                                 }
+                                return val;
                             });
-                            return future;
                         } else {
                             // Do not try to refresh the cache item if we know 
that we're not connected to the
                             // metadata store
@@ -139,46 +128,22 @@ public class MetadataCacheImpl<T> implements 
MetadataCache<T>, Consumer<Notifica
     }
 
     private CompletableFuture<Optional<CacheGetResult<T>>> 
readValueFromStore(String path) {
-        final var future = new 
CompletableFuture<Optional<CacheGetResult<T>>>();
-        store.get(path).thenComposeAsync(optRes -> {
-            // There could be multiple pending reads for the same path, for 
example, when a path is created,
-            // 1. The `accept` method will call `refresh`
-            // 2. The `put` method will call `refresh` after the metadata 
store put operation is done
-            // Both will call this method and the same result will be read. In 
this case, we only need to deserialize
-            // the value once.
-            if (!optRes.isPresent()) {
-                if (log.isDebugEnabled()) {
-                    log.debug("Key {} not found in metadata store", path);
-                }
-                return FutureUtils.value(Optional.<CacheGetResult<T>>empty());
-            }
-            final var res = optRes.get();
-            final var cachedFuture = objCache.getIfPresent(path);
-            if (cachedFuture != null && cachedFuture != future) {
-                if (log.isDebugEnabled()) {
-                    log.debug("A new read on key {} is in progress or 
completed, ignore this one", path);
-                }
-                return cachedFuture;
-            }
-            try {
-                T obj = serde.deserialize(path, res.getValue(), res.getStat());
-                if (log.isDebugEnabled()) {
-                    log.debug("Deserialized value for key {} (version: {}): 
{}", path, res.getStat().getVersion(),
-                        obj);
-                }
-                return FutureUtils.value(Optional.of(new CacheGetResult<>(obj, 
res.getStat())));
-            } catch (Throwable t) {
-                return FutureUtils.exception(new 
ContentDeserializationException(
-                    "Failed to deserialize payload for key '" + path + "'", 
t));
-            }
-        }, executor.chooseThread(path)).whenComplete((result, e) -> {
-            if (e != null) {
-                future.completeExceptionally(e.getCause());
-            } else {
-                future.complete(result);
-            }
-        });
-        return future;
+        return store.get(path)
+                .thenCompose(optRes -> {
+                    if (!optRes.isPresent()) {
+                        return FutureUtils.value(Optional.empty());
+                    }
+
+                    try {
+                        GetResult res = optRes.get();
+                        T obj = serde.deserialize(path, res.getValue(), 
res.getStat());
+                        return FutureUtils
+                                .value(Optional.of(new CacheGetResult<>(obj, 
res.getStat())));
+                    } catch (Throwable t) {
+                        return FutureUtils.exception(new 
ContentDeserializationException(
+                                "Failed to deserialize payload for key '" + 
path + "'", t));
+                    }
+                });
     }
 
     @Override
@@ -204,9 +169,8 @@ public class MetadataCacheImpl<T> implements 
MetadataCache<T>, Consumer<Notifica
 
     @Override
     public CompletableFuture<T> readModifyUpdateOrCreate(String path, 
Function<Optional<T>, T> modifyFunction) {
-        final var executor = this.executor.chooseThread(path);
         return executeWithRetry(() -> objCache.get(path)
-                .thenComposeAsync(optEntry -> {
+                .thenCompose(optEntry -> {
                     Optional<T> currentValue;
                     long expectedVersion;
 
@@ -238,14 +202,13 @@ public class MetadataCacheImpl<T> implements 
MetadataCache<T>, Consumer<Notifica
                     return store.put(path, newValue, 
Optional.of(expectedVersion)).thenAccept(__ -> {
                         refresh(path);
                     }).thenApply(__ -> newValueObj);
-                }, executor), path);
+                }), path);
     }
 
     @Override
     public CompletableFuture<T> readModifyUpdate(String path, Function<T, T> 
modifyFunction) {
-        final var executor = this.executor.chooseThread(path);
         return executeWithRetry(() -> objCache.get(path)
-                .thenComposeAsync(optEntry -> {
+                .thenCompose(optEntry -> {
                     if (!optEntry.isPresent()) {
                         return FutureUtils.exception(new 
NotFoundException(""));
                     }
@@ -268,57 +231,59 @@ public class MetadataCacheImpl<T> implements 
MetadataCache<T>, Consumer<Notifica
                     return store.put(path, newValue, 
Optional.of(expectedVersion)).thenAccept(__ -> {
                         refresh(path);
                     }).thenApply(__ -> newValueObj);
-                }, executor), path);
-    }
-
-    private CompletableFuture<byte[]> serialize(String path, T value) {
-        final var future = new CompletableFuture<byte[]>();
-        executor.executeOrdered(path, () -> {
-            try {
-                future.complete(serde.serialize(path, value));
-            } catch (Throwable t) {
-                future.completeExceptionally(t);
-            }
-        });
-        return future;
+                }), path);
     }
 
     @Override
     public CompletableFuture<Void> create(String path, T value) {
-        final var future = new CompletableFuture<Void>();
-        serialize(path, value).thenCompose(content -> store.put(path, content, 
Optional.of(-1L)))
-            // Make sure we have the value cached before the operation is 
completed
-            // In addition to caching the value, we need to add a watch on the 
path,
-            // so when/if it changes on any other node, we are notified and we 
can
-            // update the cache
-            .thenCompose(__ -> objCache.get(path))
-            .whenComplete((__, ex) -> {
-                if (ex == null) {
-                    future.complete(null);
-                } else if (ex.getCause() instanceof BadVersionException) {
-                    // Use already exists exception to provide more 
self-explanatory error message
-                    future.completeExceptionally(new 
AlreadyExistsException(ex.getCause()));
-                } else {
-                    future.completeExceptionally(ex.getCause());
-                }
-            });
+        byte[] content;
+        try {
+            content = serde.serialize(path, value);
+        } catch (Throwable t) {
+            return FutureUtils.exception(t);
+        }
+
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        store.put(path, content, Optional.of(-1L))
+                .thenAccept(stat -> {
+                    // Make sure we have the value cached before the operation 
is completed
+                    // In addition to caching the value, we need to add a 
watch on the path,
+                    // so when/if it changes on any other node, we are 
notified and we can
+                    // update the cache
+                    objCache.get(path).whenComplete((stat2, ex) -> {
+                        if (ex == null) {
+                            future.complete(null);
+                        } else {
+                            log.error("Exception while getting path {}", path, 
ex);
+                            future.completeExceptionally(ex.getCause());
+                        }
+                    });
+                }).exceptionally(ex -> {
+                    if (ex.getCause() instanceof BadVersionException) {
+                        // Use already exists exception to provide more 
self-explanatory error message
+                        future.completeExceptionally(new 
AlreadyExistsException(ex.getCause()));
+                    } else {
+                        future.completeExceptionally(ex.getCause());
+                    }
+                    return null;
+                });
+
         return future;
     }
 
     @Override
     public CompletableFuture<Void> put(String path, T value, 
EnumSet<CreateOption> options) {
-        return serialize(path, value).thenCompose(bytes -> {
-            if (storeExtended != null) {
-                return storeExtended.put(path, bytes, Optional.empty(), 
options);
-            } else {
-                return store.put(path, bytes, Optional.empty());
-            }
-        }).thenAccept(__ -> {
-            if (log.isDebugEnabled()) {
-                log.debug("Refreshing path {} after put operation", path);
-            }
-            refresh(path);
-        });
+        final byte[] bytes;
+        try {
+            bytes = serde.serialize(path, value);
+        } catch (IOException e) {
+            return CompletableFuture.failedFuture(e);
+        }
+        if (storeExtended != null) {
+            return storeExtended.put(path, bytes, Optional.empty(), 
options).thenAccept(__ -> refresh(path));
+        } else {
+            return store.put(path, bytes, Optional.empty()).thenAccept(__ -> 
refresh(path));
+        }
     }
 
     @Override
@@ -358,9 +323,6 @@ public class MetadataCacheImpl<T> implements 
MetadataCache<T>, Consumer<Notifica
         switch (t.getType()) {
         case Created:
         case Modified:
-            if (log.isDebugEnabled()) {
-                log.debug("Refreshing path {} for {} notification", path, 
t.getType());
-            }
             refresh(path);
             break;
 
@@ -392,7 +354,8 @@ public class MetadataCacheImpl<T> implements 
MetadataCache<T>, Consumer<Notifica
                 final var next = backoff.next();
                 log.info("Update key {} conflicts. Retrying in {} ms. 
Mandatory stop: {}. Elapsed time: {} ms", key,
                         next, backoff.isMandatoryStopMade(), elapsed);
-                schedulerExecutor.schedule(() -> execute(op, key, result, 
backoff), next, TimeUnit.MILLISECONDS);
+                executor.schedule(() -> execute(op, key, result, backoff), 
next,
+                        TimeUnit.MILLISECONDS);
                 return null;
             }
             result.completeExceptionally(ex.getCause());
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index d118a792e2f..b0e4b43f700 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -26,7 +26,6 @@ import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.LoadingCache;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.MoreExecutors;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import io.opentelemetry.api.OpenTelemetry;
 import java.time.Instant;
@@ -39,17 +38,16 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.common.stats.CacheMetricsCollector;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -78,9 +76,7 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
     private final CopyOnWriteArrayList<Consumer<Notification>> listeners = new 
CopyOnWriteArrayList<>();
     private final CopyOnWriteArrayList<Consumer<SessionEvent>> 
sessionListeners = new CopyOnWriteArrayList<>();
     protected final String metadataStoreName;
-    private final OrderedExecutor serDesExecutor;
-    private final ExecutorService eventExecutor;
-    private final ScheduledExecutorService schedulerExecutor;
+    protected final ScheduledExecutorService executor;
     private final AsyncLoadingCache<String, List<String>> childrenCache;
     private final AsyncLoadingCache<String, Boolean> existsCache;
     private final CopyOnWriteArrayList<MetadataCacheImpl<?>> metadataCaches = 
new CopyOnWriteArrayList<>();
@@ -97,21 +93,13 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
 
     protected MetadataNodeSizeStats nodeSizeStats;
 
-    protected AbstractMetadataStore(
-            String metadataStoreName, OpenTelemetry openTelemetry, 
MetadataNodeSizeStats nodeSizeStats,
-            int numSerDesThreads) {
+    protected AbstractMetadataStore(String metadataStoreName, OpenTelemetry 
openTelemetry,
+                MetadataNodeSizeStats nodeSizeStats) {
         this.nodeSizeStats = nodeSizeStats == null ? new 
DummyMetadataNodeSizeStats()
                 : nodeSizeStats;
-        final var namePrefix = StringUtils.isNotBlank(metadataStoreName) ? 
metadataStoreName
-                : getClass().getSimpleName();
-        this.eventExecutor = Executors.newSingleThreadExecutor(
-                new DefaultThreadFactory(namePrefix + "-event"));
-        this.schedulerExecutor = Executors.newSingleThreadScheduledExecutor(
-                new DefaultThreadFactory(namePrefix + "-scheduler"));
-        this.serDesExecutor = OrderedExecutor.newBuilder()
-                .numThreads(numSerDesThreads)
-                .name(namePrefix + "-worker")
-                .build();
+        this.executor = new ScheduledThreadPoolExecutor(1,
+                new DefaultThreadFactory(
+                        StringUtils.isNotBlank(metadataStoreName) ? 
metadataStoreName : getClass().getSimpleName()));
         registerListener(this);
 
         this.childrenCache = Caffeine.newBuilder()
@@ -261,8 +249,7 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
         JavaType typeRef = 
TypeFactory.defaultInstance().constructSimpleType(clazz, null);
         String cacheName = StringUtils.isNotBlank(metadataStoreName) ? 
metadataStoreName : getClass().getSimpleName();
         MetadataCacheImpl<T> metadataCache =
-                new MetadataCacheImpl<T>(cacheName, this, typeRef, 
cacheConfig, this.serDesExecutor,
-                        this.schedulerExecutor);
+                new MetadataCacheImpl<T>(cacheName, this, typeRef, 
cacheConfig, this.executor);
         metadataCaches.add(metadataCache);
         return metadataCache;
     }
@@ -271,8 +258,7 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
     public <T> MetadataCache<T> getMetadataCache(TypeReference<T> typeRef, 
MetadataCacheConfig cacheConfig) {
         String cacheName = StringUtils.isNotBlank(metadataStoreName) ? 
metadataStoreName : getClass().getSimpleName();
         MetadataCacheImpl<T> metadataCache =
-                new MetadataCacheImpl<T>(cacheName, this, typeRef, 
cacheConfig, this.serDesExecutor,
-                        this.schedulerExecutor);
+                new MetadataCacheImpl<T>(cacheName, this, typeRef, 
cacheConfig, this.executor);
         metadataCaches.add(metadataCache);
         return metadataCache;
     }
@@ -282,7 +268,7 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
                                                  MetadataCacheConfig 
cacheConfig) {
         MetadataCacheImpl<T> metadataCache =
                 new MetadataCacheImpl<>(cacheName, this, serde, cacheConfig,
-                        this.serDesExecutor, this.schedulerExecutor);
+                        this.executor);
         metadataCaches.add(metadataCache);
         return metadataCache;
     }
@@ -362,7 +348,7 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
                 });
 
                 return null;
-            }, eventExecutor);
+            }, executor);
         } catch (RejectedExecutionException e) {
             return FutureUtil.failedFuture(e);
         }
@@ -545,7 +531,7 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
 
         // Notice listeners.
         try {
-            eventExecutor.execute(() -> {
+            executor.execute(() -> {
                 sessionListeners.forEach(l -> {
                     try {
                         l.accept(event);
@@ -570,9 +556,8 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
 
     @Override
     public void close() throws Exception {
-        MoreExecutors.shutdownAndAwaitTermination(serDesExecutor, 10, 
TimeUnit.SECONDS);
-        MoreExecutors.shutdownAndAwaitTermination(schedulerExecutor, 10, 
TimeUnit.SECONDS);
-        MoreExecutors.shutdownAndAwaitTermination(eventExecutor, 10, 
TimeUnit.SECONDS);
+        executor.shutdownNow();
+        executor.awaitTermination(10, TimeUnit.SECONDS);
         this.metadataStoreStats.close();
     }
 
@@ -589,30 +574,30 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
         }
     }
 
-    protected final <T> void processEvent(Consumer<T> eventProcessor, T event) 
{
+    /**
+     * Run the task in the executor thread and fail the future if the executor 
is shutting down.
+     */
+    @VisibleForTesting
+    public void execute(Runnable task, CompletableFuture<?> future) {
         try {
-            eventExecutor.execute(() -> eventProcessor.accept(event));
-        } catch (RejectedExecutionException e) {
-            log.warn("Rejected processing event {}", event);
+            executor.execute(task);
+        } catch (Throwable t) {
+            future.completeExceptionally(t);
         }
     }
 
-    protected final void scheduleDelayedTask(long delay, TimeUnit unit, 
Runnable task) {
-        schedulerExecutor.schedule(task, delay, unit);
-    }
-
-    protected final void safeExecuteCallback(Runnable task, 
Consumer<Throwable> exceptionHandler) {
+    /**
+     * Run the task in the executor thread and fail the future if the executor 
is shutting down.
+     */
+    @VisibleForTesting
+    public void execute(Runnable task, Supplier<List<CompletableFuture<?>>> 
futures) {
         try {
-            eventExecutor.execute(task);
-        } catch (Throwable t) {
-            exceptionHandler.accept(t);
+            executor.execute(task);
+        } catch (final Throwable t) {
+            futures.get().forEach(f -> f.completeExceptionally(t));
         }
     }
 
-    protected final void safeExecuteCallback(Runnable task, 
CompletableFuture<?> future) {
-        safeExecuteCallback(task, future::completeExceptionally);
-    }
-
     protected static String parent(String path) {
         int idx = path.lastIndexOf('/');
         if (idx <= 0) {
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java
index e1311fccfe0..3937fd712dc 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java
@@ -188,7 +188,7 @@ public class EtcdMetadataStore extends 
AbstractBatchedMetadataStore {
     @Override
     protected CompletableFuture<Boolean> existsFromStore(String path) {
         return kv.get(ByteSequence.from(path, StandardCharsets.UTF_8), 
EXISTS_GET_OPTION)
-                .thenApply(gr -> gr.getCount() == 1);
+                .thenApplyAsync(gr -> gr.getCount() == 1, executor);
     }
 
     @Override
@@ -204,8 +204,9 @@ public class EtcdMetadataStore extends 
AbstractBatchedMetadataStore {
             }
             return super.storePut(parent, new byte[0], Optional.empty(), 
EnumSet.noneOf(CreateOption.class))
                     // Then create the unique key with the version added in 
the path
-                    .thenCompose(
-                            stat -> super.storePut(path + stat.getVersion(), 
data, optExpectedVersion, options));
+                    .thenComposeAsync(
+                            stat -> super.storePut(path + stat.getVersion(), 
data, optExpectedVersion, options),
+                            executor);
         }
     }
 
@@ -312,7 +313,9 @@ public class EtcdMetadataStore extends 
AbstractBatchedMetadataStore {
                     }
                 } else {
                     log.warn("Failed to commit: {}", cause.getMessage());
-                    ops.forEach(o -> o.getFuture().completeExceptionally(ex));
+                    executor.execute(() -> {
+                        ops.forEach(o -> 
o.getFuture().completeExceptionally(ex));
+                    });
                 }
                 return null;
             });
@@ -323,7 +326,7 @@ public class EtcdMetadataStore extends 
AbstractBatchedMetadataStore {
 
     private void handleBatchOperationResult(TxnResponse txnResponse,
                                             List<MetadataOp> ops) {
-        safeExecuteCallbacks(() -> {
+        executor.execute(() -> {
             if (!txnResponse.isSucceeded()) {
                 if (ops.size() > 1) {
                     // Retry individually
@@ -401,7 +404,7 @@ public class EtcdMetadataStore extends 
AbstractBatchedMetadataStore {
                     }
                 }
             }
-        }, ops);
+        });
     }
 
     private synchronized CompletableFuture<Void> createLease(boolean 
retryOnFailure) {
@@ -441,7 +444,9 @@ public class EtcdMetadataStore extends 
AbstractBatchedMetadataStore {
         if (retryOnFailure) {
             future.exceptionally(ex -> {
                 log.warn("Failed to create Etcd lease. Retrying later", ex);
-                scheduleDelayedTask(1, TimeUnit.SECONDS, () -> 
createLease(true));
+                executor.schedule(() -> {
+                    createLease(true);
+                }, 1, TimeUnit.SECONDS);
                 return null;
             });
         }
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
index 627304b2edc..079cb3130e0 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
@@ -79,7 +79,7 @@ public class LocalMemoryMetadataStore extends 
AbstractMetadataStore implements M
     public LocalMemoryMetadataStore(String metadataURL, MetadataStoreConfig 
metadataStoreConfig)
             throws MetadataStoreException {
         super(metadataStoreConfig.getMetadataStoreName(), 
metadataStoreConfig.getOpenTelemetry(),
-                metadataStoreConfig.getNodeSizeStats(), 
metadataStoreConfig.getNumSerDesThreads());
+                metadataStoreConfig.getNodeSizeStats());
         String name = metadataURL.substring(MEMORY_SCHEME_IDENTIFIER.length());
         // Local means a private data set
         // update synchronizer and register sync listener
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
index 08e5478ffcc..74bddda7454 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
@@ -210,7 +210,7 @@ public class RocksdbMetadataStore extends 
AbstractMetadataStore {
     private RocksdbMetadataStore(String metadataURL, MetadataStoreConfig 
metadataStoreConfig)
             throws MetadataStoreException {
         super(metadataStoreConfig.getMetadataStoreName(), 
metadataStoreConfig.getOpenTelemetry(),
-                metadataStoreConfig.getNodeSizeStats(), 
metadataStoreConfig.getNumSerDesThreads());
+                metadataStoreConfig.getNodeSizeStats());
         this.metadataUrl = metadataURL;
         try {
             RocksDB.loadLibrary();
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
index f56d6c6941f..5bf7e2272f0 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
@@ -119,7 +119,7 @@ public class ZKMetadataStore extends 
AbstractBatchedMetadataStore
 
     private void processSessionWatcher(WatchedEvent event) {
         if (sessionWatcher != null) {
-            processEvent(sessionWatcher::process, event);
+            executor.execute(() -> sessionWatcher.process(event));
         }
     }
 
@@ -245,8 +245,9 @@ public class ZKMetadataStore extends 
AbstractBatchedMetadataStore
                                 countsByType, totalSize, opsForLog);
 
                         // Retry with the individual operations
-                        scheduleDelayedTask(100, TimeUnit.MILLISECONDS,
-                            () -> ops.forEach(o -> 
batchOperation(Collections.singletonList(o))));
+                        executor.schedule(() -> {
+                            ops.forEach(o -> 
batchOperation(Collections.singletonList(o)));
+                        }, 100, TimeUnit.MILLISECONDS);
                     } else {
                         MetadataStoreException e = getException(code, path);
                         ops.forEach(o -> 
o.getFuture().completeExceptionally(e));
@@ -255,7 +256,7 @@ public class ZKMetadataStore extends 
AbstractBatchedMetadataStore
                 }
 
                 // Trigger all the futures in the batch
-                safeExecuteCallbacks(() -> {
+                execute(() -> {
                     for (int i = 0; i < ops.size(); i++) {
                         OpResult opr = results.get(i);
                         MetadataOp op = ops.get(i);
@@ -277,7 +278,7 @@ public class ZKMetadataStore extends 
AbstractBatchedMetadataStore
                                             "Operation type not supported in 
multi: " + op.getType()));
                             }
                         }
-                }, ops);
+                }, () -> 
ops.stream().map(MetadataOp::getFuture).collect(Collectors.toList()));
             }, null);
         } catch (Throwable t) {
             ops.forEach(o -> o.getFuture().completeExceptionally(new 
MetadataStoreException(t)));
@@ -394,7 +395,7 @@ public class ZKMetadataStore extends 
AbstractBatchedMetadataStore
 
         try {
             zkc.exists(path, null, (rc, path1, ctx, stat) -> {
-                safeExecuteCallback(() -> {
+                execute(() -> {
                     Code code = Code.get(rc);
                     if (code == Code.OK) {
                         future.complete(true);
@@ -420,7 +421,7 @@ public class ZKMetadataStore extends 
AbstractBatchedMetadataStore
 
         try {
             zkc.delete(op.getPath(), expectedVersion, (rc, path1, ctx) -> {
-                safeExecuteCallback(() -> {
+                execute(() -> {
                     Code code = Code.get(rc);
                     if (code == Code.OK) {
                         future.complete(null);
@@ -445,7 +446,7 @@ public class ZKMetadataStore extends 
AbstractBatchedMetadataStore
                 CreateMode createMode = getCreateMode(opPut.getOptions());
                 asyncCreateFullPathOptimistic(zkc, opPut.getPath(), 
opPut.getData(), createMode,
                         (rc, path1, ctx, name) -> {
-                            safeExecuteCallback(() -> {
+                            execute(() -> {
                                 Code code = Code.get(rc);
                                 if (code == Code.OK) {
                                     future.complete(new Stat(name, 0, 0, 0, 
createMode.isEphemeral(), true));
@@ -459,7 +460,7 @@ public class ZKMetadataStore extends 
AbstractBatchedMetadataStore
                         });
             } else {
                 zkc.setData(opPut.getPath(), opPut.getData(), expectedVersion, 
(rc, path1, ctx, stat) -> {
-                    safeExecuteCallback(() -> {
+                    execute(() -> {
                         Code code = Code.get(rc);
                         if (code == Code.OK) {
                             future.complete(getStat(path1, stat));
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
index 30989a41bd1..a9319a50fec 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
@@ -18,20 +18,16 @@
  */
 package org.apache.pulsar.metadata.impl.batching;
 
-import com.google.common.util.concurrent.MoreExecutors;
-import io.netty.util.concurrent.DefaultThreadFactory;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.metadata.api.GetResult;
 import org.apache.pulsar.metadata.api.MetadataEventSynchronizer;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
@@ -42,7 +38,6 @@ import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
 import org.apache.pulsar.metadata.impl.stats.BatchMetadataStoreStats;
 import org.jctools.queues.MessagePassingQueue;
 import org.jctools.queues.MpscUnboundedArrayQueue;
-import org.jspecify.annotations.Nullable;
 
 @Slf4j
 public abstract class AbstractBatchedMetadataStore extends 
AbstractMetadataStore {
@@ -51,6 +46,8 @@ public abstract class AbstractBatchedMetadataStore extends 
AbstractMetadataStore
     private final MessagePassingQueue<MetadataOp> readOps;
     private final MessagePassingQueue<MetadataOp> writeOps;
 
+    private final AtomicBoolean flushInProgress = new AtomicBoolean(false);
+
     private final boolean enabled;
     private final int maxDelayMillis;
     protected final int maxOperations;
@@ -58,12 +55,9 @@ public abstract class AbstractBatchedMetadataStore extends 
AbstractMetadataStore
     private MetadataEventSynchronizer synchronizer;
     private final BatchMetadataStoreStats batchMetadataStoreStats;
     protected MetadataStoreBatchStrategy metadataStoreBatchStrategy;
-    @Nullable
-    private final ScheduledExecutorService flushExecutor;
 
     protected AbstractBatchedMetadataStore(MetadataStoreConfig conf) {
-        super(conf.getMetadataStoreName(), conf.getOpenTelemetry(), 
conf.getNodeSizeStats(),
-                conf.getNumSerDesThreads());
+        super(conf.getMetadataStoreName(), conf.getOpenTelemetry(), 
conf.getNodeSizeStats());
 
         this.enabled = conf.isBatchingEnabled();
         this.maxDelayMillis = conf.getBatchingMaxDelayMillis();
@@ -73,22 +67,18 @@ public abstract class AbstractBatchedMetadataStore extends 
AbstractMetadataStore
         if (enabled) {
             readOps = new MpscUnboundedArrayQueue<>(10_000);
             writeOps = new MpscUnboundedArrayQueue<>(10_000);
-            final var name = 
StringUtils.isNotBlank(conf.getMetadataStoreName()) ? 
conf.getMetadataStoreName()
-                    : getClass().getSimpleName();
-            flushExecutor = Executors.newSingleThreadScheduledExecutor(new 
DefaultThreadFactory(
-                    name + "-batch-flusher"));
-            scheduledTask = flushExecutor.scheduleAtFixedRate(this::flush, 
maxDelayMillis, maxDelayMillis,
-                    TimeUnit.MILLISECONDS);
+            scheduledTask =
+                    executor.scheduleAtFixedRate(this::flush, maxDelayMillis, 
maxDelayMillis, TimeUnit.MILLISECONDS);
         } else {
             scheduledTask = null;
             readOps = null;
             writeOps = null;
-            flushExecutor = null;
         }
 
         // update synchronizer and register sync listener
         updateMetadataEventSynchronizer(conf.getSynchronizer());
-        this.batchMetadataStoreStats = new 
BatchMetadataStoreStats(metadataStoreName);
+        this.batchMetadataStoreStats =
+                new BatchMetadataStoreStats(metadataStoreName, executor, 
conf.getOpenTelemetry());
         this.metadataStoreBatchStrategy = new 
DefaultMetadataStoreBatchStrategy(maxOperations, maxSize);
     }
 
@@ -106,13 +96,12 @@ public abstract class AbstractBatchedMetadataStore extends 
AbstractMetadataStore
                 op.getFuture().completeExceptionally(ex);
             }
             scheduledTask.cancel(true);
-            MoreExecutors.shutdownAndAwaitTermination(flushExecutor, 10, 
TimeUnit.SECONDS);
         }
         super.close();
         this.batchMetadataStoreStats.close();
     }
 
-    private synchronized void flush() {
+    private void flush() {
         List<MetadataOp> currentBatch;
         if (!readOps.isEmpty()) {
             while (CollectionUtils.isNotEmpty(currentBatch = 
metadataStoreBatchStrategy.nextBatch(readOps))) {
@@ -124,6 +113,8 @@ public abstract class AbstractBatchedMetadataStore extends 
AbstractMetadataStore
                 internalBatchOperation(currentBatch);
             }
         }
+
+        flushInProgress.set(false);
     }
 
     @Override
@@ -178,8 +169,8 @@ public abstract class AbstractBatchedMetadataStore extends 
AbstractMetadataStore
                 internalBatchOperation(Collections.singletonList(op));
                 return;
             }
-            if (queue.size() > maxOperations) {
-                flush();
+            if (queue.size() > maxOperations && 
flushInProgress.compareAndSet(false, true)) {
+                executor.execute(this::flush);
             }
         } else {
             internalBatchOperation(Collections.singletonList(op));
@@ -203,8 +194,4 @@ public abstract class AbstractBatchedMetadataStore extends 
AbstractMetadataStore
     }
 
     protected abstract void batchOperation(List<MetadataOp> ops);
-
-    protected final void safeExecuteCallbacks(Runnable runnable, 
List<MetadataOp> ops) {
-        safeExecuteCallback(runnable, t -> ops.forEach(op -> 
op.getFuture().completeExceptionally(t)));
-    }
 }
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
index 407a927bda4..d055dd7da55 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
@@ -61,7 +61,7 @@ public class OxiaMetadataStore extends AbstractMetadataStore {
     private Optional<MetadataEventSynchronizer> synchronizer;
 
     public OxiaMetadataStore(AsyncOxiaClient oxia, String identity) {
-        super("oxia-metadata", OpenTelemetry.noop(), null, 1);
+        super("oxia-metadata", OpenTelemetry.noop(), null);
         this.client = oxia;
         this.identity = identity;
         this.synchronizer = Optional.empty();
@@ -75,7 +75,7 @@ public class OxiaMetadataStore extends AbstractMetadataStore {
             boolean enableSessionWatcher)
             throws Exception {
         super("oxia-metadata", 
Objects.requireNonNull(metadataStoreConfig).getOpenTelemetry(),
-                metadataStoreConfig.getNodeSizeStats(), 
metadataStoreConfig.getNumSerDesThreads());
+                metadataStoreConfig.getNodeSizeStats());
 
         var linger = metadataStoreConfig.getBatchingMaxDelayMillis();
         if (!metadataStoreConfig.isBatchingEnabled()) {
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/BatchMetadataStoreStats.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/BatchMetadataStoreStats.java
index 82cc15d8aaf..9549a8df8f9 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/BatchMetadataStoreStats.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/BatchMetadataStoreStats.java
@@ -18,13 +18,23 @@
  */
 package org.apache.pulsar.metadata.impl.stats;
 
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.ObservableLongUpDownCounter;
+import io.prometheus.client.Gauge;
 import io.prometheus.client.Histogram;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 public final class BatchMetadataStoreStats implements AutoCloseable {
     private static final double[] BUCKETS = new double[]{1, 5, 10, 20, 50, 
100, 200, 500, 1000};
     private static final String NAME = "name";
 
+    private static final Gauge EXECUTOR_QUEUE_SIZE = Gauge
+            .build("pulsar_batch_metadata_store_executor_queue_size", "-")
+            .labelNames(NAME)
+            .register();
     private static final Histogram OPS_WAITING = Histogram
             .build("pulsar_batch_metadata_store_queue_wait_time", "-")
             .unit("ms")
@@ -44,17 +54,46 @@ public final class BatchMetadataStoreStats implements 
AutoCloseable {
             .register();
 
     private final AtomicBoolean closed = new AtomicBoolean(false);
+    private final ThreadPoolExecutor executor;
     private final String metadataStoreName;
 
     private final Histogram.Child batchOpsWaitingChild;
     private final Histogram.Child batchExecuteTimeChild;
     private final Histogram.Child opsPerBatchChild;
 
-    public BatchMetadataStoreStats(String metadataStoreName) {
+    public static final String EXECUTOR_QUEUE_SIZE_METRIC_NAME = 
"pulsar.broker.metadata.store.executor.queue.size";
+    private final ObservableLongUpDownCounter batchMetadataStoreSizeCounter;
+
+    public BatchMetadataStoreStats(String metadataStoreName, ExecutorService 
executor, OpenTelemetry openTelemetry) {
+        if (executor instanceof ThreadPoolExecutor tx) {
+            this.executor = tx;
+        } else {
+            this.executor = null;
+        }
         this.metadataStoreName = metadataStoreName;
+
+        EXECUTOR_QUEUE_SIZE.setChild(new Gauge.Child() {
+            @Override
+            public double get() {
+                return getQueueSize();
+            }
+        }, metadataStoreName);
+
         this.batchOpsWaitingChild = OPS_WAITING.labels(metadataStoreName);
         this.batchExecuteTimeChild = 
BATCH_EXECUTE_TIME.labels(metadataStoreName);
         this.opsPerBatchChild = OPS_PER_BATCH.labels(metadataStoreName);
+
+        var meter = openTelemetry.getMeter("org.apache.pulsar");
+        var attributes = Attributes.of(MetadataStoreStats.METADATA_STORE_NAME, 
metadataStoreName);
+        this.batchMetadataStoreSizeCounter = meter
+                .upDownCounterBuilder(EXECUTOR_QUEUE_SIZE_METRIC_NAME)
+                .setDescription("The number of batch operations in the 
metadata store executor queue")
+                .setUnit("{operation}")
+                .buildWithCallback(measurement -> 
measurement.record(getQueueSize(), attributes));
+    }
+
+    private int getQueueSize() {
+        return executor == null ? 0 : executor.getQueue().size();
     }
 
     public void recordOpWaiting(long millis) {
@@ -72,9 +111,11 @@ public final class BatchMetadataStoreStats implements 
AutoCloseable {
     @Override
     public void close() throws Exception {
         if (closed.compareAndSet(false, true)) {
+            EXECUTOR_QUEUE_SIZE.remove(this.metadataStoreName);
             OPS_WAITING.remove(this.metadataStoreName);
             BATCH_EXECUTE_TIME.remove(this.metadataStoreName);
             OPS_PER_BATCH.remove(metadataStoreName);
+            batchMetadataStoreSizeCounter.close();
         }
     }
 }
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
index 0ae0b022a35..d42b2228346 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
@@ -96,7 +96,7 @@ public class MetadataStoreFactoryImplTest {
 
     public static class MyMetadataStore extends AbstractMetadataStore {
         protected MyMetadataStore() {
-            super("custom", OpenTelemetry.noop(), null, 1);
+            super("custom", OpenTelemetry.noop(), null);
         }
 
         @Override

Reply via email to