This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5a44e024abe8fa8d267528b425a4d479772bd21a
Author: Lari Hotari <[email protected]>
AuthorDate: Mon Feb 16 12:12:49 2026 +0200

    Reapply "[improve][meta] PIP-453: Improve the metadata store threading 
model (#25187)"
    
    This reverts commit 56cd23c301dd1a9ae6249c51439da5e142037185.
---
 conf/broker.conf                                   |   2 +
 conf/standalone.conf                               |   3 +
 pip/pip-453.md                                     |  82 +++++++++
 .../IsolatedBookieEnsemblePlacementPolicy.java     |   3 +
 .../apache/pulsar/broker/ServiceConfiguration.java |   6 +
 .../IsolatedBookieEnsemblePlacementPolicyTest.java |  34 ++--
 .../org/apache/pulsar/broker/PulsarService.java    |   2 +
 .../apache/pulsar/broker/PulsarServiceTest.java    |  65 ++++++++
 .../stats/OpenTelemetryMetadataStoreStatsTest.java |  12 --
 .../pulsar/metadata/api/MetadataStoreConfig.java   |   3 +
 .../metadata/cache/impl/MetadataCacheImpl.java     | 185 ++++++++++++---------
 .../metadata/impl/AbstractMetadataStore.java       |  77 +++++----
 .../pulsar/metadata/impl/EtcdMetadataStore.java    |  19 +--
 .../metadata/impl/LocalMemoryMetadataStore.java    |   2 +-
 .../pulsar/metadata/impl/RocksdbMetadataStore.java |   2 +-
 .../pulsar/metadata/impl/ZKMetadataStore.java      |  19 +--
 .../batching/AbstractBatchedMetadataStore.java     |  39 +++--
 .../metadata/impl/oxia/OxiaMetadataStore.java      |   4 +-
 .../impl/stats/BatchMetadataStoreStats.java        |  43 +----
 .../impl/MetadataStoreFactoryImplTest.java         |   2 +-
 20 files changed, 393 insertions(+), 211 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 8b6ec12d7f2..b2e74569f14 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -954,6 +954,8 @@ metadataStoreBatchingMaxOperations=1000
 # Maximum size of a batch
 metadataStoreBatchingMaxSizeKb=128
 
+# The number of threads used for serializing and deserializing data to and 
from the metadata store
+metadataStoreSerDesThreads=1
 
 ### --- Authentication --- ###
 
diff --git a/conf/standalone.conf b/conf/standalone.conf
index e5adf9a9637..571cc0fbbe8 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -430,6 +430,9 @@ metadataStoreBatchingMaxOperations=1000
 # Maximum size of a batch
 metadataStoreBatchingMaxSizeKb=128
 
+# The number of threads used for serializing and deserializing data to and 
from the metadata store
+metadataStoreSerDesThreads=1
+
 ### --- TLS --- ###
 # Deprecated - Use webServicePortTls and brokerServicePortTls instead
 tlsEnabled=false
diff --git a/pip/pip-453.md b/pip/pip-453.md
new file mode 100644
index 00000000000..f9109798ba7
--- /dev/null
+++ b/pip/pip-453.md
@@ -0,0 +1,82 @@
+# PIP-453: Improve the metadata store threading model
+
+# Background knowledge
+
+The `pulsar-metadata` module provides two abstractions for interacting with 
metadata stores:
+- `MetadataStore`: the wrapper on the actual underlying metadata store (e.g. 
ZooKeeper), which has caches for value and children of a given key.
+- `MetadataCache<T>`: a typed cache layer on top of `MetadataStore`, which 
performs serialization and deserialization of data between `T` and `byte[]`.
+
+The `MetadataStore` instance is unique in each broker, and is shared by 
multiple `MetadataCache<T>` instances.
+
+However, a single thread whose name starts with the metadata store name (e.g. 
`ZK-MetadataStore`) is used by implementations of them. This thread is used in 
the following tasks:
+1. Executing callbacks of APIs like `put`.
+2. Executing notification handlers, including `AbstractMetadataStore#accept`, 
which calls `accept` methods of all `MetadataCache` instances and all listeners 
registered by `MetadataStore#registerListener`.
+3. For ZooKeeper and Etcd, which support batching requests, it's used to 
schedule flushing tasks at a fixed rate, which is determined by the 
`metadataStoreBatchingMaxDelayMillis` config (default: 5 ms).
+4. Scheduling some other tasks, e.g. retrying failed operations.
+
+It should be noted that `MetadataCache` executes the compute sensitive tasks 
like serialization in the `MetadataStore` callback. When the number of metadata 
operations grows, this thread is easy to be overwhelmed. It also affects the 
topic loading, which involves many metadata operations, this thread can be 
overwhelmed and block other tasks. For example, in a production environment, 
it's observed that the `pulsar_batch_metadata_store_queue_wait_time` metric is 
high (100 ms), which should [...]
+
+# Motivation
+
+The single thread model is inefficient when there are many metadata 
operations. For example, when a broker is down and the topics owned by this 
broker will be transferred to the new owner broker. Since the new owner broker 
might never owned them before, even the `MetadataCache` caches are cold, which 
results in many metadata operations. However, the CPU-bound tasks like 
serialization and deserialization are executed in the `MetadataStore` thread, 
which makes it easy to be overwhelmed. Th [...]
+
+In a production environment, there is a case when the metadata operation rate 
increased suddenly, the `pulsar_batch_metadata_store_queue_wait_time_ms_bucket` 
metric increased to ~100 ms, which is a part of the total latency of a metadata 
operation. As a result, the total P99 get latency 
(`pulsar_metadata_store_ops_latency_ms_bucket{type="get"}`) increased to 2 
seconds.
+
+The 3rd task in the previous section is scheduled via `scheduleAtFixedRate`, 
which means if the task is not executed in time (5 ms by default), the task 
will be executed immediately again in a short time, which also burdens the 
single metadata store thread.
+
+# Goals
+
+## In Scope
+
+Improve the existing thread model to handle various tasks on metadata store, 
which could avoid a single thread being overwhelmed when there are many 
metadata operations.
+
+## Out of Scope
+
+Actually the batching mechanism introduced by 
[#13043](https://github.com/apache/pulsar/pull/13043) is harmful. The `flush` 
method, which is responsible to send a batch of metadata operations to broker, 
is called in the metadata store thread rather than the caller thread. The 
trade-off of the higher throughput is the lower latency. The benefit is limited 
because in most time the metadata operation rate is not so high. See this [test 
report](https://github.com/BewareMyPower/zookeeper-benc [...]
+
+This proposal doesn't intend to change the existing batching mechanism or 
disable it by default. It only improves the threading model to avoid the single 
thread being overwhelmed.
+
+Additionally, some code paths execute the compute intensive tasks in the 
metadata store thread directly (e.g. `store.get(path).thenApply(/* ... */)`), 
this proposal does not aim at changing them to asynchronous methods (e.g. 
`thenApplyAsync`).
+
+# High Level Design
+
+Create 4 sets of threads:
+- `<name>-event`: the original metadata store thread, which is now only 
responsible to handle notifications. This executor won't be a 
`ScheduledExecutorService` anymore.
+- `<name>-scheduler`: a single thread, which is used to schedule tasks like 
flushing and retrying failed operations.
+- `<name>-batch-flusher`: a single thread, which is used to schedule the 
flushing task at a fixed rate. It won't be created if 
`metadataStoreBatchingEnabled` is false.
+- `<name>-worker`: a fixed thread pool shared by all `MetadataCache` instances 
to execute compute intensive tasks like serialization and deserialization. The 
same path will be handled by the same thread to keep the processing order on 
the same path.
+
+Regarding the callbacks, don't switch to a different thread. This change is 
not breaking because the underlying metadata store usually executes the 
callback in a single thread (e.g. `<name>-EventThread` in ZooKeeper) like the 
single thread in the current implementation. The caller should be responsible 
to manage worker threads on the metadata operation result if the callback is 
compute intensive.
+
+The only concern is that introducing a new thread to execute callbacks allows 
waiting for the future of metadata store APIs in the callback. After this 
change, the following use case could be a dead lock:
+
+```java
+metadataStore.get(path).thenApply(__ -> metadataStore.get(otherPath).join());;
+```
+
+# Detailed Design
+
+## Public-facing Changes
+
+### Configuration
+
+Add a configurations to specify the number of worker threads for 
`MetadataCache`:
+
+```java
+    @FieldContext(
+            category = CATEGORY_SERVER,
+            doc = "The number of threads uses for serializing and 
deserializing data to and from the metadata store"
+    )
+    private int metadataStoreSerDesThreads = 1;
+```
+
+Use 1 as the default value since the serialization and deserialization tasks 
are not frequent. This separated thread pool is mainly added to avoid blocking 
the metadata store callback thread.
+
+### Metrics
+
+The `pulsar_batch_metadata_store_executor_queue_size` metric will be removed 
because the `<name>-batch-flusher` thread won't execute other tasks except for 
flushing.
+
+# Links
+
+* Mailing List discussion thread: 
https://lists.apache.org/thread/0cfdyvj96gw1sp1mo2zghl0lmsms5w1d
+* Mailing List voting thread: 
https://lists.apache.org/thread/cktj2k8myw076yggn63k8yxs5357yd61
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
index 878bbc4d654..4ef1c594be4 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
 import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
@@ -57,6 +58,8 @@ public class IsolatedBookieEnsemblePlacementPolicy extends 
RackawareEnsemblePlac
     // the secondary group.
     private ImmutablePair<Set<String>, Set<String>> defaultIsolationGroups;
 
+    @Getter
+    @VisibleForTesting
     private MetadataCache<BookiesRackConfiguration> bookieMappingCache;
 
     private static final String PULSAR_SYSTEM_TOPIC_ISOLATION_GROUP = "*";
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 44b62709fab..93670bb2377 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -498,6 +498,12 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     )
     private boolean metadataStoreAllowReadOnlyOperations;
 
+    @FieldContext(
+            category = CATEGORY_SERVER,
+            doc = "The number of threads used for serializing and 
deserializing data to and from the metadata store"
+    )
+    private int metadataStoreSerDesThreads = 1;
+
     @Deprecated
     @FieldContext(
         category = CATEGORY_SERVER,
diff --git 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
index 68f92ab416d..936b04386ff 100644
--- 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
+++ 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
@@ -22,12 +22,15 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Sets;
 import io.netty.util.HashedWheelTimer;
 import java.nio.charset.StandardCharsets;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -288,8 +291,7 @@ public class IsolatedBookieEnsemblePlacementPolicyTest {
         secondaryBookieGroup.put(BOOKIE4, 
BookieInfo.builder().rack("rack0").build());
         bookieMapping.put("group2", secondaryBookieGroup);
 
-        store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, 
jsonMapper.writeValueAsBytes(bookieMapping),
-                Optional.empty()).join();
+        updateBookieInfo(isolationPolicy, 
jsonMapper.writeValueAsBytes(bookieMapping));
 
         ensemble = isolationPolicy.newEnsemble(2, 2, 2, Collections.emptyMap(),
                 null).getResult();
@@ -340,8 +342,7 @@ public class IsolatedBookieEnsemblePlacementPolicyTest {
                 + "\": {\"rack\": \"rack0\", \"hostname\": 
\"bookie3.example.com\"}, \"" + BOOKIE4
                 + "\": {\"rack\": \"rack2\", \"hostname\": 
\"bookie4.example.com\"}}}";
 
-        store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, 
data.getBytes(StandardCharsets.UTF_8),
-                Optional.empty()).join();
+        updateBookieInfo(isolationPolicy, 
data.getBytes(StandardCharsets.UTF_8));
 
         List<BookieId> ensemble = isolationPolicy.newEnsemble(2, 2, 2, 
Collections.emptyMap(),
                 new HashSet<>()).getResult();
@@ -399,8 +400,7 @@ public class IsolatedBookieEnsemblePlacementPolicyTest {
         bookieMapping.put("group1", mainBookieGroup);
         bookieMapping.put("group2", secondaryBookieGroup);
 
-        store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, 
jsonMapper.writeValueAsBytes(bookieMapping),
-                Optional.empty()).join();
+        updateBookieInfo(isolationPolicy, 
jsonMapper.writeValueAsBytes(bookieMapping));
 
         ensemble = isolationPolicy.newEnsemble(3, 3, 3, Collections.emptyMap(),
                 new HashSet<>()).getResult();
@@ -784,8 +784,7 @@ public class IsolatedBookieEnsemblePlacementPolicyTest {
         bookieMapping.put(isolationGroup2, group2);
         bookieMapping.put(isolationGroup3, group3);
 
-        store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, 
jsonMapper.writeValueAsBytes(bookieMapping),
-                Optional.empty()).join();
+        updateBookieInfo(isolationPolicy, 
jsonMapper.writeValueAsBytes(bookieMapping));
 
         groups.setLeft(Sets.newHashSet(isolationGroup1, isolationGroup3));
         groups.setRight(Sets.newHashSet(""));
@@ -808,8 +807,7 @@ public class IsolatedBookieEnsemblePlacementPolicyTest {
         bookieMapping.put(isolationGroup1, group1);
         bookieMapping.put(isolationGroup2, group2);
 
-        store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, 
jsonMapper.writeValueAsBytes(bookieMapping),
-                Optional.empty()).join();
+        updateBookieInfo(isolationPolicy, 
jsonMapper.writeValueAsBytes(bookieMapping));
 
         groups.setLeft(Sets.newHashSet(isolationGroup1));
         groups.setRight(Sets.newHashSet(isolationGroup2));
@@ -831,12 +829,24 @@ public class IsolatedBookieEnsemblePlacementPolicyTest {
         bookieMapping.put(isolationGroup1, group1);
         bookieMapping.put(isolationGroup2, group2);
 
-        store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, 
jsonMapper.writeValueAsBytes(bookieMapping),
-                Optional.empty()).join();
+        updateBookieInfo(isolationPolicy, 
jsonMapper.writeValueAsBytes(bookieMapping));
 
         groups.setLeft(Sets.newHashSet(isolationGroup1));
         groups.setRight(Sets.newHashSet(isolationGroup2));
         blacklist = isolationPolicy.getExcludedBookiesWithIsolationGroups(2, 
groups);
         assertTrue(blacklist.isEmpty());
     }
+
+    // The policy gets the bookie info asynchronously before each query or 
update, when putting the bookie info into
+    // the metadata store, the cache needs some time to receive the 
notification and update accordingly.
+    private void updateBookieInfo(IsolatedBookieEnsemblePlacementPolicy 
isolationPolicy, byte[] bookieInfo) {
+        final var cache = isolationPolicy.getBookieMappingCache();
+        assertNotNull(cache); // the policy must have been initialized
+
+        final var key = BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH;
+        final var previousBookieInfo = cache.getIfCached(key);
+        store.put(key, bookieInfo, Optional.empty()).join();
+        Awaitility.await().atMost(Duration.ofSeconds(1)).untilAsserted(() ->
+                assertNotEquals(cache.getIfCached(key), previousBookieInfo));
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index e61fbfac566..eaebd7fa58c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -434,6 +434,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                         .synchronizer(synchronizer)
                         .openTelemetry(openTelemetry)
                         .nodeSizeStats(new DefaultMetadataNodeSizeStats())
+                        
.numSerDesThreads(config.getMetadataStoreSerDesThreads())
                         .build());
     }
 
@@ -1305,6 +1306,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                         .metadataStoreName(MetadataStoreConfig.METADATA_STORE)
                         .openTelemetry(openTelemetry)
                         .nodeSizeStats(new DefaultMetadataNodeSizeStats())
+                        
.numSerDesThreads(config.getMetadataStoreSerDesThreads())
                         .build());
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
index 6c04889d8f1..6195e9cdae5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
@@ -25,10 +25,15 @@ import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import static org.testng.AssertJUnit.assertSame;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -38,6 +43,10 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.metadata.api.MetadataCacheConfig;
+import org.apache.pulsar.metadata.api.MetadataSerde;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.Stat;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.Test;
 
@@ -339,4 +348,60 @@ public class PulsarServiceTest extends 
MockedPulsarServiceBaseTest {
             assertTrue(e instanceof PulsarClientException.TimeoutException);
         }
     }
+
+    @Test
+    public void testMetadataSerDesThreads() throws Exception {
+        final var numSerDesThreads = 5;
+        final var config = new ServiceConfiguration();
+        config.setMetadataStoreSerDesThreads(numSerDesThreads);
+        config.setClusterName("test");
+        config.setMetadataStoreUrl("memory:local");
+        config.setConfigurationMetadataStoreUrl("memory:local");
+
+        @Cleanup final var pulsar = new PulsarService(config);
+        pulsar.start();
+
+        BiConsumer<MetadataStore, String> verifier = (store, prefix) -> {
+            final var serDes = new CustomMetadataSerDes();
+            final var cache = store.getMetadataCache(prefix, serDes, 
MetadataCacheConfig.builder().build());
+            for (int i = 0; i < 100 && 
serDes.threadNameToSerializedPaths.size() < numSerDesThreads; i++) {
+                cache.create(prefix + i, "value-" + i).join();
+                final var value = cache.get(prefix + i).join();
+                assertEquals(value.orElseThrow(), "value-" + i);
+                final var newValue = cache.readModifyUpdate(prefix + i, s -> s 
+ "-updated").join();
+                assertEquals(newValue, "value-" + i + "-updated");
+                // Verify the serialization and deserialization are handled by 
the same thread
+                assertEquals(serDes.threadNameToSerializedPaths, 
serDes.threadNameToDeserializedPaths);
+            }
+            log.info("SerDes thread mapping: {}", 
serDes.threadNameToSerializedPaths);
+            assertEquals(serDes.threadNameToSerializedPaths.keySet().size(), 
numSerDesThreads);
+            // Verify a path cannot be handled by multiple threads
+            final var paths = 
serDes.threadNameToSerializedPaths.values().stream()
+                .flatMap(Set::stream).sorted().toList();
+            assertEquals(paths.stream().distinct().toList(), paths);
+        };
+
+        verifier.accept(pulsar.getLocalMetadataStore(), "/test-local/");
+        verifier.accept(pulsar.getConfigurationMetadataStore(), 
"/test-config/");
+    }
+
+    private static class CustomMetadataSerDes implements MetadataSerde<String> 
{
+
+        final Map<String, Set<String>> threadNameToSerializedPaths = new 
ConcurrentHashMap<>();
+        final Map<String, Set<String>> threadNameToDeserializedPaths = new 
ConcurrentHashMap<>();
+
+        @Override
+        public byte[] serialize(String path, String value) throws IOException{
+            
threadNameToSerializedPaths.computeIfAbsent(Thread.currentThread().getName(),
+                    __ -> ConcurrentHashMap.newKeySet()).add(path);
+            return value.getBytes();
+        }
+
+        @Override
+        public String deserialize(String path, byte[] data, Stat stat) throws 
IOException {
+            
threadNameToDeserializedPaths.computeIfAbsent(Thread.currentThread().getName(),
+                    __ -> ConcurrentHashMap.newKeySet()).add(path);
+            return new String(data);
+        }
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryMetadataStoreStatsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryMetadataStoreStatsTest.java
index 9e8bde20b88..390aa1e49e2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryMetadataStoreStatsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryMetadataStoreStatsTest.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.broker.stats;
 import static 
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
 import static org.assertj.core.api.Assertions.assertThat;
 import io.opentelemetry.api.common.Attributes;
-import java.util.concurrent.ExecutorService;
 import lombok.Cleanup;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.pulsar.broker.BrokerTestUtil;
@@ -29,7 +28,6 @@ import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.broker.testcontext.NonClosingProxyHandler;
 import org.apache.pulsar.broker.testcontext.PulsarTestContext;
 import org.apache.pulsar.metadata.api.MetadataStore;
-import org.apache.pulsar.metadata.impl.stats.BatchMetadataStoreStats;
 import org.apache.pulsar.metadata.impl.stats.MetadataStoreStats;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -53,14 +51,6 @@ public class OpenTelemetryMetadataStoreStatsTest extends 
BrokerTestBase {
         var newStats = new MetadataStoreStats(
                 localMetadataStoreName, 
pulsar.getOpenTelemetry().getOpenTelemetryService().getOpenTelemetry());
         FieldUtils.writeField(localMetadataStore, "metadataStoreStats", 
newStats, true);
-
-        var currentBatchedStats = (BatchMetadataStoreStats) 
FieldUtils.readField(localMetadataStore,
-                "batchMetadataStoreStats", true);
-        currentBatchedStats.close();
-        var currentExecutor = (ExecutorService) 
FieldUtils.readField(currentBatchedStats, "executor", true);
-        var newBatchedStats = new 
BatchMetadataStoreStats(localMetadataStoreName, currentExecutor,
-                
pulsar.getOpenTelemetry().getOpenTelemetryService().getOpenTelemetry());
-        FieldUtils.writeField(localMetadataStore, "batchMetadataStoreStats", 
newBatchedStats, true);
     }
 
     @AfterMethod(alwaysRun = true)
@@ -89,7 +79,5 @@ public class OpenTelemetryMetadataStoreStatsTest extends 
BrokerTestBase {
         var metrics = 
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
         assertMetricLongSumValue(metrics, 
MetadataStoreStats.METADATA_STORE_PUT_BYTES_COUNTER_METRIC_NAME,
                 attributes, value -> assertThat(value).isPositive());
-        assertMetricLongSumValue(metrics, 
BatchMetadataStoreStats.EXECUTOR_QUEUE_SIZE_METRIC_NAME, attributes,
-                value -> assertThat(value).isPositive());
     }
 }
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
index ef50dc87691..fcde0dce840 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
@@ -104,4 +104,7 @@ public class MetadataStoreConfig {
      * The estimator to estimate the payload length of metadata node, which 
used to limit the batch size requested.
      */
     private MetadataNodeSizeStats nodeSizeStats;
+
+    @Builder.Default
+    private final int numSerDesThreads = 1;
 }
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
index b1f0572547c..ca165f0464e 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
@@ -39,10 +39,10 @@ import java.util.function.Supplier;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.pulsar.common.stats.CacheMetricsCollector;
 import org.apache.pulsar.common.util.Backoff;
 import org.apache.pulsar.metadata.api.CacheGetResult;
-import org.apache.pulsar.metadata.api.GetResult;
 import org.apache.pulsar.metadata.api.MetadataCache;
 import org.apache.pulsar.metadata.api.MetadataCacheConfig;
 import org.apache.pulsar.metadata.api.MetadataSerde;
@@ -62,23 +62,26 @@ public class MetadataCacheImpl<T> implements 
MetadataCache<T>, Consumer<Notifica
     private final MetadataStore store;
     private final MetadataStoreExtended storeExtended;
     private final MetadataSerde<T> serde;
-    private final ScheduledExecutorService executor;
+    private final OrderedExecutor executor;
+    private final ScheduledExecutorService schedulerExecutor;
     private final MetadataCacheConfig<T> cacheConfig;
 
     private final AsyncLoadingCache<String, Optional<CacheGetResult<T>>> 
objCache;
 
     public MetadataCacheImpl(String cacheName, MetadataStore store, 
TypeReference<T> typeRef,
-                             MetadataCacheConfig<T> cacheConfig, 
ScheduledExecutorService executor) {
-        this(cacheName, store, new JSONMetadataSerdeTypeRef<>(typeRef), 
cacheConfig, executor);
+                             MetadataCacheConfig<T> cacheConfig, 
OrderedExecutor executor,
+                             ScheduledExecutorService schedulerExecutor) {
+        this(cacheName, store, new JSONMetadataSerdeTypeRef<>(typeRef), 
cacheConfig, executor, schedulerExecutor);
     }
 
     public MetadataCacheImpl(String cacheName, MetadataStore store, JavaType 
type, MetadataCacheConfig<T> cacheConfig,
-                             ScheduledExecutorService executor) {
-        this(cacheName, store, new JSONMetadataSerdeSimpleType<>(type), 
cacheConfig, executor);
+                             OrderedExecutor executor, 
ScheduledExecutorService schedulerExecutor) {
+        this(cacheName, store, new JSONMetadataSerdeSimpleType<>(type), 
cacheConfig, executor, schedulerExecutor);
     }
 
     public MetadataCacheImpl(String cacheName, MetadataStore store, 
MetadataSerde<T> serde,
-                             MetadataCacheConfig<T> cacheConfig, 
ScheduledExecutorService executor) {
+                             MetadataCacheConfig<T> cacheConfig, 
OrderedExecutor executor,
+                             ScheduledExecutorService schedulerExecutor) {
         this.store = store;
         if (store instanceof MetadataStoreExtended) {
             this.storeExtended = (MetadataStoreExtended) store;
@@ -88,6 +91,7 @@ public class MetadataCacheImpl<T> implements 
MetadataCache<T>, Consumer<Notifica
         this.serde = serde;
         this.cacheConfig = cacheConfig;
         this.executor = executor;
+        this.schedulerExecutor = schedulerExecutor;
 
         Caffeine<Object, Object> cacheBuilder = Caffeine.newBuilder();
         if (cacheConfig.getRefreshAfterWriteMillis() > 0) {
@@ -101,6 +105,9 @@ public class MetadataCacheImpl<T> implements 
MetadataCache<T>, Consumer<Notifica
                 .buildAsync(new AsyncCacheLoader<String, 
Optional<CacheGetResult<T>>>() {
                     @Override
                     public CompletableFuture<Optional<CacheGetResult<T>>> 
asyncLoad(String key, Executor executor) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("Loading key {} into metadata cache {}", 
key, cacheName);
+                        }
                         return readValueFromStore(key);
                     }
 
@@ -110,12 +117,16 @@ public class MetadataCacheImpl<T> implements 
MetadataCache<T>, Consumer<Notifica
                             Optional<CacheGetResult<T>> oldValue,
                             Executor executor) {
                         if (store instanceof AbstractMetadataStore && 
((AbstractMetadataStore) store).isConnected()) {
-                            return readValueFromStore(key).thenApply(val -> {
+                            if (log.isDebugEnabled()) {
+                                log.debug("Reloading key {} into metadata 
cache {}", key, cacheName);
+                            }
+                            final var future = readValueFromStore(key);
+                            future.thenAccept(val -> {
                                 if (cacheConfig.getAsyncReloadConsumer() != 
null) {
                                     
cacheConfig.getAsyncReloadConsumer().accept(key, val);
                                 }
-                                return val;
                             });
+                            return future;
                         } else {
                             // Do not try to refresh the cache item if we know 
that we're not connected to the
                             // metadata store
@@ -128,22 +139,46 @@ public class MetadataCacheImpl<T> implements 
MetadataCache<T>, Consumer<Notifica
     }
 
     private CompletableFuture<Optional<CacheGetResult<T>>> 
readValueFromStore(String path) {
-        return store.get(path)
-                .thenCompose(optRes -> {
-                    if (!optRes.isPresent()) {
-                        return FutureUtils.value(Optional.empty());
-                    }
-
-                    try {
-                        GetResult res = optRes.get();
-                        T obj = serde.deserialize(path, res.getValue(), 
res.getStat());
-                        return FutureUtils
-                                .value(Optional.of(new CacheGetResult<>(obj, 
res.getStat())));
-                    } catch (Throwable t) {
-                        return FutureUtils.exception(new 
ContentDeserializationException(
-                                "Failed to deserialize payload for key '" + 
path + "'", t));
-                    }
-                });
+        final var future = new 
CompletableFuture<Optional<CacheGetResult<T>>>();
+        store.get(path).thenComposeAsync(optRes -> {
+            // There could be multiple pending reads for the same path, for 
example, when a path is created,
+            // 1. The `accept` method will call `refresh`
+            // 2. The `put` method will call `refresh` after the metadata 
store put operation is done
+            // Both will call this method and the same result will be read. In 
this case, we only need to deserialize
+            // the value once.
+            if (!optRes.isPresent()) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Key {} not found in metadata store", path);
+                }
+                return FutureUtils.value(Optional.<CacheGetResult<T>>empty());
+            }
+            final var res = optRes.get();
+            final var cachedFuture = objCache.getIfPresent(path);
+            if (cachedFuture != null && cachedFuture != future) {
+                if (log.isDebugEnabled()) {
+                    log.debug("A new read on key {} is in progress or 
completed, ignore this one", path);
+                }
+                return cachedFuture;
+            }
+            try {
+                T obj = serde.deserialize(path, res.getValue(), res.getStat());
+                if (log.isDebugEnabled()) {
+                    log.debug("Deserialized value for key {} (version: {}): 
{}", path, res.getStat().getVersion(),
+                        obj);
+                }
+                return FutureUtils.value(Optional.of(new CacheGetResult<>(obj, 
res.getStat())));
+            } catch (Throwable t) {
+                return FutureUtils.exception(new 
ContentDeserializationException(
+                    "Failed to deserialize payload for key '" + path + "'", 
t));
+            }
+        }, executor.chooseThread(path)).whenComplete((result, e) -> {
+            if (e != null) {
+                future.completeExceptionally(e.getCause());
+            } else {
+                future.complete(result);
+            }
+        });
+        return future;
     }
 
     @Override
@@ -169,8 +204,9 @@ public class MetadataCacheImpl<T> implements 
MetadataCache<T>, Consumer<Notifica
 
     @Override
     public CompletableFuture<T> readModifyUpdateOrCreate(String path, 
Function<Optional<T>, T> modifyFunction) {
+        final var executor = this.executor.chooseThread(path);
         return executeWithRetry(() -> objCache.get(path)
-                .thenCompose(optEntry -> {
+                .thenComposeAsync(optEntry -> {
                     Optional<T> currentValue;
                     long expectedVersion;
 
@@ -202,13 +238,14 @@ public class MetadataCacheImpl<T> implements 
MetadataCache<T>, Consumer<Notifica
                     return store.put(path, newValue, 
Optional.of(expectedVersion)).thenAccept(__ -> {
                         refresh(path);
                     }).thenApply(__ -> newValueObj);
-                }), path);
+                }, executor), path);
     }
 
     @Override
     public CompletableFuture<T> readModifyUpdate(String path, Function<T, T> 
modifyFunction) {
+        final var executor = this.executor.chooseThread(path);
         return executeWithRetry(() -> objCache.get(path)
-                .thenCompose(optEntry -> {
+                .thenComposeAsync(optEntry -> {
                     if (!optEntry.isPresent()) {
                         return FutureUtils.exception(new 
NotFoundException(""));
                     }
@@ -231,59 +268,57 @@ public class MetadataCacheImpl<T> implements 
MetadataCache<T>, Consumer<Notifica
                     return store.put(path, newValue, 
Optional.of(expectedVersion)).thenAccept(__ -> {
                         refresh(path);
                     }).thenApply(__ -> newValueObj);
-                }), path);
+                }, executor), path);
+    }
+
+    private CompletableFuture<byte[]> serialize(String path, T value) {
+        final var future = new CompletableFuture<byte[]>();
+        executor.executeOrdered(path, () -> {
+            try {
+                future.complete(serde.serialize(path, value));
+            } catch (Throwable t) {
+                future.completeExceptionally(t);
+            }
+        });
+        return future;
     }
 
     @Override
     public CompletableFuture<Void> create(String path, T value) {
-        byte[] content;
-        try {
-            content = serde.serialize(path, value);
-        } catch (Throwable t) {
-            return FutureUtils.exception(t);
-        }
-
-        CompletableFuture<Void> future = new CompletableFuture<>();
-        store.put(path, content, Optional.of(-1L))
-                .thenAccept(stat -> {
-                    // Make sure we have the value cached before the operation 
is completed
-                    // In addition to caching the value, we need to add a 
watch on the path,
-                    // so when/if it changes on any other node, we are 
notified and we can
-                    // update the cache
-                    objCache.get(path).whenComplete((stat2, ex) -> {
-                        if (ex == null) {
-                            future.complete(null);
-                        } else {
-                            log.error("Exception while getting path {}", path, 
ex);
-                            future.completeExceptionally(ex.getCause());
-                        }
-                    });
-                }).exceptionally(ex -> {
-                    if (ex.getCause() instanceof BadVersionException) {
-                        // Use already exists exception to provide more 
self-explanatory error message
-                        future.completeExceptionally(new 
AlreadyExistsException(ex.getCause()));
-                    } else {
-                        future.completeExceptionally(ex.getCause());
-                    }
-                    return null;
-                });
-
+        final var future = new CompletableFuture<Void>();
+        serialize(path, value).thenCompose(content -> store.put(path, content, 
Optional.of(-1L)))
+            // Make sure we have the value cached before the operation is 
completed
+            // In addition to caching the value, we need to add a watch on the 
path,
+            // so when/if it changes on any other node, we are notified and we 
can
+            // update the cache
+            .thenCompose(__ -> objCache.get(path))
+            .whenComplete((__, ex) -> {
+                if (ex == null) {
+                    future.complete(null);
+                } else if (ex.getCause() instanceof BadVersionException) {
+                    // Use already exists exception to provide more 
self-explanatory error message
+                    future.completeExceptionally(new 
AlreadyExistsException(ex.getCause()));
+                } else {
+                    future.completeExceptionally(ex.getCause());
+                }
+            });
         return future;
     }
 
     @Override
     public CompletableFuture<Void> put(String path, T value, 
EnumSet<CreateOption> options) {
-        final byte[] bytes;
-        try {
-            bytes = serde.serialize(path, value);
-        } catch (IOException e) {
-            return CompletableFuture.failedFuture(e);
-        }
-        if (storeExtended != null) {
-            return storeExtended.put(path, bytes, Optional.empty(), 
options).thenAccept(__ -> refresh(path));
-        } else {
-            return store.put(path, bytes, Optional.empty()).thenAccept(__ -> 
refresh(path));
-        }
+        return serialize(path, value).thenCompose(bytes -> {
+            if (storeExtended != null) {
+                return storeExtended.put(path, bytes, Optional.empty(), 
options);
+            } else {
+                return store.put(path, bytes, Optional.empty());
+            }
+        }).thenAccept(__ -> {
+            if (log.isDebugEnabled()) {
+                log.debug("Refreshing path {} after put operation", path);
+            }
+            refresh(path);
+        });
     }
 
     @Override
@@ -323,6 +358,9 @@ public class MetadataCacheImpl<T> implements 
MetadataCache<T>, Consumer<Notifica
         switch (t.getType()) {
         case Created:
         case Modified:
+            if (log.isDebugEnabled()) {
+                log.debug("Refreshing path {} for {} notification", path, 
t.getType());
+            }
             refresh(path);
             break;
 
@@ -354,8 +392,7 @@ public class MetadataCacheImpl<T> implements 
MetadataCache<T>, Consumer<Notifica
                 final var next = backoff.next();
                 log.info("Update key {} conflicts. Retrying in {} ms. 
Mandatory stop: {}. Elapsed time: {} ms", key,
                         next, backoff.isMandatoryStopMade(), elapsed);
-                executor.schedule(() -> execute(op, key, result, backoff), 
next,
-                        TimeUnit.MILLISECONDS);
+                schedulerExecutor.schedule(() -> execute(op, key, result, 
backoff), next, TimeUnit.MILLISECONDS);
                 return null;
             }
             result.completeExceptionally(ex.getCause());
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index b0e4b43f700..d118a792e2f 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -26,6 +26,7 @@ import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.LoadingCache;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.MoreExecutors;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import io.opentelemetry.api.OpenTelemetry;
 import java.time.Instant;
@@ -38,16 +39,17 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
-import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.common.stats.CacheMetricsCollector;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -76,7 +78,9 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
     private final CopyOnWriteArrayList<Consumer<Notification>> listeners = new 
CopyOnWriteArrayList<>();
     private final CopyOnWriteArrayList<Consumer<SessionEvent>> 
sessionListeners = new CopyOnWriteArrayList<>();
     protected final String metadataStoreName;
-    protected final ScheduledExecutorService executor;
+    private final OrderedExecutor serDesExecutor;
+    private final ExecutorService eventExecutor;
+    private final ScheduledExecutorService schedulerExecutor;
     private final AsyncLoadingCache<String, List<String>> childrenCache;
     private final AsyncLoadingCache<String, Boolean> existsCache;
     private final CopyOnWriteArrayList<MetadataCacheImpl<?>> metadataCaches = 
new CopyOnWriteArrayList<>();
@@ -93,13 +97,21 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
 
     protected MetadataNodeSizeStats nodeSizeStats;
 
-    protected AbstractMetadataStore(String metadataStoreName, OpenTelemetry 
openTelemetry,
-                MetadataNodeSizeStats nodeSizeStats) {
+    protected AbstractMetadataStore(
+            String metadataStoreName, OpenTelemetry openTelemetry, 
MetadataNodeSizeStats nodeSizeStats,
+            int numSerDesThreads) {
         this.nodeSizeStats = nodeSizeStats == null ? new 
DummyMetadataNodeSizeStats()
                 : nodeSizeStats;
-        this.executor = new ScheduledThreadPoolExecutor(1,
-                new DefaultThreadFactory(
-                        StringUtils.isNotBlank(metadataStoreName) ? 
metadataStoreName : getClass().getSimpleName()));
+        final var namePrefix = StringUtils.isNotBlank(metadataStoreName) ? 
metadataStoreName
+                : getClass().getSimpleName();
+        this.eventExecutor = Executors.newSingleThreadExecutor(
+                new DefaultThreadFactory(namePrefix + "-event"));
+        this.schedulerExecutor = Executors.newSingleThreadScheduledExecutor(
+                new DefaultThreadFactory(namePrefix + "-scheduler"));
+        this.serDesExecutor = OrderedExecutor.newBuilder()
+                .numThreads(numSerDesThreads)
+                .name(namePrefix + "-worker")
+                .build();
         registerListener(this);
 
         this.childrenCache = Caffeine.newBuilder()
@@ -249,7 +261,8 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
         JavaType typeRef = 
TypeFactory.defaultInstance().constructSimpleType(clazz, null);
         String cacheName = StringUtils.isNotBlank(metadataStoreName) ? 
metadataStoreName : getClass().getSimpleName();
         MetadataCacheImpl<T> metadataCache =
-                new MetadataCacheImpl<T>(cacheName, this, typeRef, 
cacheConfig, this.executor);
+                new MetadataCacheImpl<T>(cacheName, this, typeRef, 
cacheConfig, this.serDesExecutor,
+                        this.schedulerExecutor);
         metadataCaches.add(metadataCache);
         return metadataCache;
     }
@@ -258,7 +271,8 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
     public <T> MetadataCache<T> getMetadataCache(TypeReference<T> typeRef, 
MetadataCacheConfig cacheConfig) {
         String cacheName = StringUtils.isNotBlank(metadataStoreName) ? 
metadataStoreName : getClass().getSimpleName();
         MetadataCacheImpl<T> metadataCache =
-                new MetadataCacheImpl<T>(cacheName, this, typeRef, 
cacheConfig, this.executor);
+                new MetadataCacheImpl<T>(cacheName, this, typeRef, 
cacheConfig, this.serDesExecutor,
+                        this.schedulerExecutor);
         metadataCaches.add(metadataCache);
         return metadataCache;
     }
@@ -268,7 +282,7 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
                                                  MetadataCacheConfig 
cacheConfig) {
         MetadataCacheImpl<T> metadataCache =
                 new MetadataCacheImpl<>(cacheName, this, serde, cacheConfig,
-                        this.executor);
+                        this.serDesExecutor, this.schedulerExecutor);
         metadataCaches.add(metadataCache);
         return metadataCache;
     }
@@ -348,7 +362,7 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
                 });
 
                 return null;
-            }, executor);
+            }, eventExecutor);
         } catch (RejectedExecutionException e) {
             return FutureUtil.failedFuture(e);
         }
@@ -531,7 +545,7 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
 
         // Notice listeners.
         try {
-            executor.execute(() -> {
+            eventExecutor.execute(() -> {
                 sessionListeners.forEach(l -> {
                     try {
                         l.accept(event);
@@ -556,8 +570,9 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
 
     @Override
     public void close() throws Exception {
-        executor.shutdownNow();
-        executor.awaitTermination(10, TimeUnit.SECONDS);
+        MoreExecutors.shutdownAndAwaitTermination(serDesExecutor, 10, 
TimeUnit.SECONDS);
+        MoreExecutors.shutdownAndAwaitTermination(schedulerExecutor, 10, 
TimeUnit.SECONDS);
+        MoreExecutors.shutdownAndAwaitTermination(eventExecutor, 10, 
TimeUnit.SECONDS);
         this.metadataStoreStats.close();
     }
 
@@ -574,30 +589,30 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
         }
     }
 
-    /**
-     * Run the task in the executor thread and fail the future if the executor 
is shutting down.
-     */
-    @VisibleForTesting
-    public void execute(Runnable task, CompletableFuture<?> future) {
+    protected final <T> void processEvent(Consumer<T> eventProcessor, T event) 
{
         try {
-            executor.execute(task);
-        } catch (Throwable t) {
-            future.completeExceptionally(t);
+            eventExecutor.execute(() -> eventProcessor.accept(event));
+        } catch (RejectedExecutionException e) {
+            log.warn("Rejected processing event {}", event);
         }
     }
 
-    /**
-     * Run the task in the executor thread and fail the future if the executor 
is shutting down.
-     */
-    @VisibleForTesting
-    public void execute(Runnable task, Supplier<List<CompletableFuture<?>>> 
futures) {
+    protected final void scheduleDelayedTask(long delay, TimeUnit unit, 
Runnable task) {
+        schedulerExecutor.schedule(task, delay, unit);
+    }
+
+    protected final void safeExecuteCallback(Runnable task, 
Consumer<Throwable> exceptionHandler) {
         try {
-            executor.execute(task);
-        } catch (final Throwable t) {
-            futures.get().forEach(f -> f.completeExceptionally(t));
+            eventExecutor.execute(task);
+        } catch (Throwable t) {
+            exceptionHandler.accept(t);
         }
     }
 
+    protected final void safeExecuteCallback(Runnable task, 
CompletableFuture<?> future) {
+        safeExecuteCallback(task, future::completeExceptionally);
+    }
+
     protected static String parent(String path) {
         int idx = path.lastIndexOf('/');
         if (idx <= 0) {
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java
index 3937fd712dc..e1311fccfe0 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java
@@ -188,7 +188,7 @@ public class EtcdMetadataStore extends 
AbstractBatchedMetadataStore {
     @Override
     protected CompletableFuture<Boolean> existsFromStore(String path) {
         return kv.get(ByteSequence.from(path, StandardCharsets.UTF_8), 
EXISTS_GET_OPTION)
-                .thenApplyAsync(gr -> gr.getCount() == 1, executor);
+                .thenApply(gr -> gr.getCount() == 1);
     }
 
     @Override
@@ -204,9 +204,8 @@ public class EtcdMetadataStore extends 
AbstractBatchedMetadataStore {
             }
             return super.storePut(parent, new byte[0], Optional.empty(), 
EnumSet.noneOf(CreateOption.class))
                     // Then create the unique key with the version added in 
the path
-                    .thenComposeAsync(
-                            stat -> super.storePut(path + stat.getVersion(), 
data, optExpectedVersion, options),
-                            executor);
+                    .thenCompose(
+                            stat -> super.storePut(path + stat.getVersion(), 
data, optExpectedVersion, options));
         }
     }
 
@@ -313,9 +312,7 @@ public class EtcdMetadataStore extends 
AbstractBatchedMetadataStore {
                     }
                 } else {
                     log.warn("Failed to commit: {}", cause.getMessage());
-                    executor.execute(() -> {
-                        ops.forEach(o -> 
o.getFuture().completeExceptionally(ex));
-                    });
+                    ops.forEach(o -> o.getFuture().completeExceptionally(ex));
                 }
                 return null;
             });
@@ -326,7 +323,7 @@ public class EtcdMetadataStore extends 
AbstractBatchedMetadataStore {
 
     private void handleBatchOperationResult(TxnResponse txnResponse,
                                             List<MetadataOp> ops) {
-        executor.execute(() -> {
+        safeExecuteCallbacks(() -> {
             if (!txnResponse.isSucceeded()) {
                 if (ops.size() > 1) {
                     // Retry individually
@@ -404,7 +401,7 @@ public class EtcdMetadataStore extends 
AbstractBatchedMetadataStore {
                     }
                 }
             }
-        });
+        }, ops);
     }
 
     private synchronized CompletableFuture<Void> createLease(boolean 
retryOnFailure) {
@@ -444,9 +441,7 @@ public class EtcdMetadataStore extends 
AbstractBatchedMetadataStore {
         if (retryOnFailure) {
             future.exceptionally(ex -> {
                 log.warn("Failed to create Etcd lease. Retrying later", ex);
-                executor.schedule(() -> {
-                    createLease(true);
-                }, 1, TimeUnit.SECONDS);
+                scheduleDelayedTask(1, TimeUnit.SECONDS, () -> 
createLease(true));
                 return null;
             });
         }
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
index 079cb3130e0..627304b2edc 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
@@ -79,7 +79,7 @@ public class LocalMemoryMetadataStore extends 
AbstractMetadataStore implements M
     public LocalMemoryMetadataStore(String metadataURL, MetadataStoreConfig 
metadataStoreConfig)
             throws MetadataStoreException {
         super(metadataStoreConfig.getMetadataStoreName(), 
metadataStoreConfig.getOpenTelemetry(),
-                metadataStoreConfig.getNodeSizeStats());
+                metadataStoreConfig.getNodeSizeStats(), 
metadataStoreConfig.getNumSerDesThreads());
         String name = metadataURL.substring(MEMORY_SCHEME_IDENTIFIER.length());
         // Local means a private data set
         // update synchronizer and register sync listener
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
index 74bddda7454..08e5478ffcc 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
@@ -210,7 +210,7 @@ public class RocksdbMetadataStore extends 
AbstractMetadataStore {
     private RocksdbMetadataStore(String metadataURL, MetadataStoreConfig 
metadataStoreConfig)
             throws MetadataStoreException {
         super(metadataStoreConfig.getMetadataStoreName(), 
metadataStoreConfig.getOpenTelemetry(),
-                metadataStoreConfig.getNodeSizeStats());
+                metadataStoreConfig.getNodeSizeStats(), 
metadataStoreConfig.getNumSerDesThreads());
         this.metadataUrl = metadataURL;
         try {
             RocksDB.loadLibrary();
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
index 5bf7e2272f0..f56d6c6941f 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
@@ -119,7 +119,7 @@ public class ZKMetadataStore extends 
AbstractBatchedMetadataStore
 
     private void processSessionWatcher(WatchedEvent event) {
         if (sessionWatcher != null) {
-            executor.execute(() -> sessionWatcher.process(event));
+            processEvent(sessionWatcher::process, event);
         }
     }
 
@@ -245,9 +245,8 @@ public class ZKMetadataStore extends 
AbstractBatchedMetadataStore
                                 countsByType, totalSize, opsForLog);
 
                         // Retry with the individual operations
-                        executor.schedule(() -> {
-                            ops.forEach(o -> 
batchOperation(Collections.singletonList(o)));
-                        }, 100, TimeUnit.MILLISECONDS);
+                        scheduleDelayedTask(100, TimeUnit.MILLISECONDS,
+                            () -> ops.forEach(o -> 
batchOperation(Collections.singletonList(o))));
                     } else {
                         MetadataStoreException e = getException(code, path);
                         ops.forEach(o -> 
o.getFuture().completeExceptionally(e));
@@ -256,7 +255,7 @@ public class ZKMetadataStore extends 
AbstractBatchedMetadataStore
                 }
 
                 // Trigger all the futures in the batch
-                execute(() -> {
+                safeExecuteCallbacks(() -> {
                     for (int i = 0; i < ops.size(); i++) {
                         OpResult opr = results.get(i);
                         MetadataOp op = ops.get(i);
@@ -278,7 +277,7 @@ public class ZKMetadataStore extends 
AbstractBatchedMetadataStore
                                             "Operation type not supported in 
multi: " + op.getType()));
                             }
                         }
-                }, () -> 
ops.stream().map(MetadataOp::getFuture).collect(Collectors.toList()));
+                }, ops);
             }, null);
         } catch (Throwable t) {
             ops.forEach(o -> o.getFuture().completeExceptionally(new 
MetadataStoreException(t)));
@@ -395,7 +394,7 @@ public class ZKMetadataStore extends 
AbstractBatchedMetadataStore
 
         try {
             zkc.exists(path, null, (rc, path1, ctx, stat) -> {
-                execute(() -> {
+                safeExecuteCallback(() -> {
                     Code code = Code.get(rc);
                     if (code == Code.OK) {
                         future.complete(true);
@@ -421,7 +420,7 @@ public class ZKMetadataStore extends 
AbstractBatchedMetadataStore
 
         try {
             zkc.delete(op.getPath(), expectedVersion, (rc, path1, ctx) -> {
-                execute(() -> {
+                safeExecuteCallback(() -> {
                     Code code = Code.get(rc);
                     if (code == Code.OK) {
                         future.complete(null);
@@ -446,7 +445,7 @@ public class ZKMetadataStore extends 
AbstractBatchedMetadataStore
                 CreateMode createMode = getCreateMode(opPut.getOptions());
                 asyncCreateFullPathOptimistic(zkc, opPut.getPath(), 
opPut.getData(), createMode,
                         (rc, path1, ctx, name) -> {
-                            execute(() -> {
+                            safeExecuteCallback(() -> {
                                 Code code = Code.get(rc);
                                 if (code == Code.OK) {
                                     future.complete(new Stat(name, 0, 0, 0, 
createMode.isEphemeral(), true));
@@ -460,7 +459,7 @@ public class ZKMetadataStore extends 
AbstractBatchedMetadataStore
                         });
             } else {
                 zkc.setData(opPut.getPath(), opPut.getData(), expectedVersion, 
(rc, path1, ctx, stat) -> {
-                    execute(() -> {
+                    safeExecuteCallback(() -> {
                         Code code = Code.get(rc);
                         if (code == Code.OK) {
                             future.complete(getStat(path1, stat));
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
index a9319a50fec..30989a41bd1 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
@@ -18,16 +18,20 @@
  */
 package org.apache.pulsar.metadata.impl.batching;
 
+import com.google.common.util.concurrent.MoreExecutors;
+import io.netty.util.concurrent.DefaultThreadFactory;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.metadata.api.GetResult;
 import org.apache.pulsar.metadata.api.MetadataEventSynchronizer;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
@@ -38,6 +42,7 @@ import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
 import org.apache.pulsar.metadata.impl.stats.BatchMetadataStoreStats;
 import org.jctools.queues.MessagePassingQueue;
 import org.jctools.queues.MpscUnboundedArrayQueue;
+import org.jspecify.annotations.Nullable;
 
 @Slf4j
 public abstract class AbstractBatchedMetadataStore extends 
AbstractMetadataStore {
@@ -46,8 +51,6 @@ public abstract class AbstractBatchedMetadataStore extends 
AbstractMetadataStore
     private final MessagePassingQueue<MetadataOp> readOps;
     private final MessagePassingQueue<MetadataOp> writeOps;
 
-    private final AtomicBoolean flushInProgress = new AtomicBoolean(false);
-
     private final boolean enabled;
     private final int maxDelayMillis;
     protected final int maxOperations;
@@ -55,9 +58,12 @@ public abstract class AbstractBatchedMetadataStore extends 
AbstractMetadataStore
     private MetadataEventSynchronizer synchronizer;
     private final BatchMetadataStoreStats batchMetadataStoreStats;
     protected MetadataStoreBatchStrategy metadataStoreBatchStrategy;
+    @Nullable
+    private final ScheduledExecutorService flushExecutor;
 
     protected AbstractBatchedMetadataStore(MetadataStoreConfig conf) {
-        super(conf.getMetadataStoreName(), conf.getOpenTelemetry(), 
conf.getNodeSizeStats());
+        super(conf.getMetadataStoreName(), conf.getOpenTelemetry(), 
conf.getNodeSizeStats(),
+                conf.getNumSerDesThreads());
 
         this.enabled = conf.isBatchingEnabled();
         this.maxDelayMillis = conf.getBatchingMaxDelayMillis();
@@ -67,18 +73,22 @@ public abstract class AbstractBatchedMetadataStore extends 
AbstractMetadataStore
         if (enabled) {
             readOps = new MpscUnboundedArrayQueue<>(10_000);
             writeOps = new MpscUnboundedArrayQueue<>(10_000);
-            scheduledTask =
-                    executor.scheduleAtFixedRate(this::flush, maxDelayMillis, 
maxDelayMillis, TimeUnit.MILLISECONDS);
+            final var name = 
StringUtils.isNotBlank(conf.getMetadataStoreName()) ? 
conf.getMetadataStoreName()
+                    : getClass().getSimpleName();
+            flushExecutor = Executors.newSingleThreadScheduledExecutor(new 
DefaultThreadFactory(
+                    name + "-batch-flusher"));
+            scheduledTask = flushExecutor.scheduleAtFixedRate(this::flush, 
maxDelayMillis, maxDelayMillis,
+                    TimeUnit.MILLISECONDS);
         } else {
             scheduledTask = null;
             readOps = null;
             writeOps = null;
+            flushExecutor = null;
         }
 
         // update synchronizer and register sync listener
         updateMetadataEventSynchronizer(conf.getSynchronizer());
-        this.batchMetadataStoreStats =
-                new BatchMetadataStoreStats(metadataStoreName, executor, 
conf.getOpenTelemetry());
+        this.batchMetadataStoreStats = new 
BatchMetadataStoreStats(metadataStoreName);
         this.metadataStoreBatchStrategy = new 
DefaultMetadataStoreBatchStrategy(maxOperations, maxSize);
     }
 
@@ -96,12 +106,13 @@ public abstract class AbstractBatchedMetadataStore extends 
AbstractMetadataStore
                 op.getFuture().completeExceptionally(ex);
             }
             scheduledTask.cancel(true);
+            MoreExecutors.shutdownAndAwaitTermination(flushExecutor, 10, 
TimeUnit.SECONDS);
         }
         super.close();
         this.batchMetadataStoreStats.close();
     }
 
-    private void flush() {
+    private synchronized void flush() {
         List<MetadataOp> currentBatch;
         if (!readOps.isEmpty()) {
             while (CollectionUtils.isNotEmpty(currentBatch = 
metadataStoreBatchStrategy.nextBatch(readOps))) {
@@ -113,8 +124,6 @@ public abstract class AbstractBatchedMetadataStore extends 
AbstractMetadataStore
                 internalBatchOperation(currentBatch);
             }
         }
-
-        flushInProgress.set(false);
     }
 
     @Override
@@ -169,8 +178,8 @@ public abstract class AbstractBatchedMetadataStore extends 
AbstractMetadataStore
                 internalBatchOperation(Collections.singletonList(op));
                 return;
             }
-            if (queue.size() > maxOperations && 
flushInProgress.compareAndSet(false, true)) {
-                executor.execute(this::flush);
+            if (queue.size() > maxOperations) {
+                flush();
             }
         } else {
             internalBatchOperation(Collections.singletonList(op));
@@ -194,4 +203,8 @@ public abstract class AbstractBatchedMetadataStore extends 
AbstractMetadataStore
     }
 
     protected abstract void batchOperation(List<MetadataOp> ops);
+
+    protected final void safeExecuteCallbacks(Runnable runnable, 
List<MetadataOp> ops) {
+        safeExecuteCallback(runnable, t -> ops.forEach(op -> 
op.getFuture().completeExceptionally(t)));
+    }
 }
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
index d055dd7da55..407a927bda4 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
@@ -61,7 +61,7 @@ public class OxiaMetadataStore extends AbstractMetadataStore {
     private Optional<MetadataEventSynchronizer> synchronizer;
 
     public OxiaMetadataStore(AsyncOxiaClient oxia, String identity) {
-        super("oxia-metadata", OpenTelemetry.noop(), null);
+        super("oxia-metadata", OpenTelemetry.noop(), null, 1);
         this.client = oxia;
         this.identity = identity;
         this.synchronizer = Optional.empty();
@@ -75,7 +75,7 @@ public class OxiaMetadataStore extends AbstractMetadataStore {
             boolean enableSessionWatcher)
             throws Exception {
         super("oxia-metadata", 
Objects.requireNonNull(metadataStoreConfig).getOpenTelemetry(),
-                metadataStoreConfig.getNodeSizeStats());
+                metadataStoreConfig.getNodeSizeStats(), 
metadataStoreConfig.getNumSerDesThreads());
 
         var linger = metadataStoreConfig.getBatchingMaxDelayMillis();
         if (!metadataStoreConfig.isBatchingEnabled()) {
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/BatchMetadataStoreStats.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/BatchMetadataStoreStats.java
index 9549a8df8f9..82cc15d8aaf 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/BatchMetadataStoreStats.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/BatchMetadataStoreStats.java
@@ -18,23 +18,13 @@
  */
 package org.apache.pulsar.metadata.impl.stats;
 
-import io.opentelemetry.api.OpenTelemetry;
-import io.opentelemetry.api.common.Attributes;
-import io.opentelemetry.api.metrics.ObservableLongUpDownCounter;
-import io.prometheus.client.Gauge;
 import io.prometheus.client.Histogram;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 public final class BatchMetadataStoreStats implements AutoCloseable {
     private static final double[] BUCKETS = new double[]{1, 5, 10, 20, 50, 
100, 200, 500, 1000};
     private static final String NAME = "name";
 
-    private static final Gauge EXECUTOR_QUEUE_SIZE = Gauge
-            .build("pulsar_batch_metadata_store_executor_queue_size", "-")
-            .labelNames(NAME)
-            .register();
     private static final Histogram OPS_WAITING = Histogram
             .build("pulsar_batch_metadata_store_queue_wait_time", "-")
             .unit("ms")
@@ -54,46 +44,17 @@ public final class BatchMetadataStoreStats implements 
AutoCloseable {
             .register();
 
     private final AtomicBoolean closed = new AtomicBoolean(false);
-    private final ThreadPoolExecutor executor;
     private final String metadataStoreName;
 
     private final Histogram.Child batchOpsWaitingChild;
     private final Histogram.Child batchExecuteTimeChild;
     private final Histogram.Child opsPerBatchChild;
 
-    public static final String EXECUTOR_QUEUE_SIZE_METRIC_NAME = 
"pulsar.broker.metadata.store.executor.queue.size";
-    private final ObservableLongUpDownCounter batchMetadataStoreSizeCounter;
-
-    public BatchMetadataStoreStats(String metadataStoreName, ExecutorService 
executor, OpenTelemetry openTelemetry) {
-        if (executor instanceof ThreadPoolExecutor tx) {
-            this.executor = tx;
-        } else {
-            this.executor = null;
-        }
+    public BatchMetadataStoreStats(String metadataStoreName) {
         this.metadataStoreName = metadataStoreName;
-
-        EXECUTOR_QUEUE_SIZE.setChild(new Gauge.Child() {
-            @Override
-            public double get() {
-                return getQueueSize();
-            }
-        }, metadataStoreName);
-
         this.batchOpsWaitingChild = OPS_WAITING.labels(metadataStoreName);
         this.batchExecuteTimeChild = 
BATCH_EXECUTE_TIME.labels(metadataStoreName);
         this.opsPerBatchChild = OPS_PER_BATCH.labels(metadataStoreName);
-
-        var meter = openTelemetry.getMeter("org.apache.pulsar");
-        var attributes = Attributes.of(MetadataStoreStats.METADATA_STORE_NAME, 
metadataStoreName);
-        this.batchMetadataStoreSizeCounter = meter
-                .upDownCounterBuilder(EXECUTOR_QUEUE_SIZE_METRIC_NAME)
-                .setDescription("The number of batch operations in the 
metadata store executor queue")
-                .setUnit("{operation}")
-                .buildWithCallback(measurement -> 
measurement.record(getQueueSize(), attributes));
-    }
-
-    private int getQueueSize() {
-        return executor == null ? 0 : executor.getQueue().size();
     }
 
     public void recordOpWaiting(long millis) {
@@ -111,11 +72,9 @@ public final class BatchMetadataStoreStats implements 
AutoCloseable {
     @Override
     public void close() throws Exception {
         if (closed.compareAndSet(false, true)) {
-            EXECUTOR_QUEUE_SIZE.remove(this.metadataStoreName);
             OPS_WAITING.remove(this.metadataStoreName);
             BATCH_EXECUTE_TIME.remove(this.metadataStoreName);
             OPS_PER_BATCH.remove(metadataStoreName);
-            batchMetadataStoreSizeCounter.close();
         }
     }
 }
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
index d42b2228346..0ae0b022a35 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
@@ -96,7 +96,7 @@ public class MetadataStoreFactoryImplTest {
 
     public static class MyMetadataStore extends AbstractMetadataStore {
         protected MyMetadataStore() {
-            super("custom", OpenTelemetry.noop(), null);
+            super("custom", OpenTelemetry.noop(), null, 1);
         }
 
         @Override

Reply via email to