This is an automated email from the ASF dual-hosted git repository.

zhaocong pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new a6aab863b4a Revert "[improve][meta] PIP-453: Improve the metadata 
store threading model (#25187)"
a6aab863b4a is described below

commit a6aab863b4a86b5dcb9be21045f1333f1c4501f2
Author: coderzc <[email protected]>
AuthorDate: Fri Feb 13 10:49:13 2026 +0800

    Revert "[improve][meta] PIP-453: Improve the metadata store threading model 
(#25187)"
    
    This reverts commit c5f7271499e8879353139b9b1ee021a8dda111ad.
---
 conf/broker.conf                                   |   2 -
 conf/standalone.conf                               |   3 -
 pip/pip-453.md                                     |  26 ++-
 .../IsolatedBookieEnsemblePlacementPolicy.java     |   3 -
 .../apache/pulsar/broker/ServiceConfiguration.java |   6 -
 .../IsolatedBookieEnsemblePlacementPolicyTest.java |  34 ++--
 .../org/apache/pulsar/broker/PulsarService.java    |   2 -
 .../apache/pulsar/broker/PulsarServiceTest.java    |  65 --------
 .../stats/OpenTelemetryMetadataStoreStatsTest.java |  12 ++
 .../pulsar/metadata/api/MetadataStoreConfig.java   |   3 -
 .../metadata/cache/impl/MetadataCacheImpl.java     | 185 +++++++++------------
 .../metadata/impl/AbstractMetadataStore.java       |  77 ++++-----
 .../pulsar/metadata/impl/EtcdMetadataStore.java    |  19 ++-
 .../metadata/impl/LocalMemoryMetadataStore.java    |   2 +-
 .../pulsar/metadata/impl/RocksdbMetadataStore.java |   2 +-
 .../pulsar/metadata/impl/ZKMetadataStore.java      |  19 ++-
 .../batching/AbstractBatchedMetadataStore.java     |  39 ++---
 .../metadata/impl/oxia/OxiaMetadataStore.java      |   4 +-
 .../impl/stats/BatchMetadataStoreStats.java        |  43 ++++-
 .../impl/MetadataStoreFactoryImplTest.java         |   2 +-
 20 files changed, 232 insertions(+), 316 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 503a7a0e11f..85c0a133c47 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -951,8 +951,6 @@ metadataStoreBatchingMaxOperations=1000
 # Maximum size of a batch
 metadataStoreBatchingMaxSizeKb=128
 
-# The number of threads used for serializing and deserializing data to and 
from the metadata store
-metadataStoreSerDesThreads=1
 
 ### --- Authentication --- ###
 
diff --git a/conf/standalone.conf b/conf/standalone.conf
index f9c7ccb658f..77e2f37ac88 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -430,9 +430,6 @@ metadataStoreBatchingMaxOperations=1000
 # Maximum size of a batch
 metadataStoreBatchingMaxSizeKb=128
 
-# The number of threads used for serializing and deserializing data to and 
from the metadata store
-metadataStoreSerDesThreads=1
-
 ### --- TLS --- ###
 # Deprecated - Use webServicePortTls and brokerServicePortTls instead
 tlsEnabled=false
diff --git a/pip/pip-453.md b/pip/pip-453.md
index f9109798ba7..a42736b9dda 100644
--- a/pip/pip-453.md
+++ b/pip/pip-453.md
@@ -40,9 +40,8 @@ Additionally, some code paths execute the compute intensive 
tasks in the metadat
 
 # High Level Design
 
-Create 4 sets of threads:
+Create 3 set of threads:
 - `<name>-event`: the original metadata store thread, which is now only 
responsible to handle notifications. This executor won't be a 
`ScheduledExecutorService` anymore.
-- `<name>-scheduler`: a single thread, which is used to schedule tasks like 
flushing and retrying failed operations.
 - `<name>-batch-flusher`: a single thread, which is used to schedule the 
flushing task at a fixed rate. It won't be created if 
`metadataStoreBatchingEnabled` is false.
 - `<name>-worker`: a fixed thread pool shared by all `MetadataCache` instances 
to execute compute intensive tasks like serialization and deserialization. The 
same path will be handled by the same thread to keep the processing order on 
the same path.
 
@@ -54,6 +53,25 @@ The only concern is that introducing a new thread to execute 
callbacks allows wa
 metadataStore.get(path).thenApply(__ -> metadataStore.get(otherPath).join());;
 ```
 
+Other tasks like the retry on failure is executed in JVM's common 
`ForkJoinPool` by `CompletableFuture` APIs. For example:
+
+```diff
+--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
++++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+@@ -245,9 +245,8 @@ public class ZKMetadataStore extends 
AbstractBatchedMetadataStore
+                                 countsByType, totalSize, opsForLog);
+
+                         // Retry with the individual operations
+-                        executor.schedule(() -> {
+-                            ops.forEach(o -> 
batchOperation(Collections.singletonList(o)));
+-                        }, 100, TimeUnit.MILLISECONDS);
++                        CompletableFuture.delayedExecutor(100, 
TimeUnit.MILLISECONDS).execute(() ->
++                                ops.forEach(o -> 
batchOperation(Collections.singletonList(o))));
+                     } else {
+                         MetadataStoreException e = getException(code, path);
+                         ops.forEach(o -> 
o.getFuture().completeExceptionally(e));
+```
+
 # Detailed Design
 
 ## Public-facing Changes
@@ -67,11 +85,9 @@ Add a configurations to specify the number of worker threads 
for `MetadataCache`
             category = CATEGORY_SERVER,
             doc = "The number of threads uses for serializing and 
deserializing data to and from the metadata store"
     )
-    private int metadataStoreSerDesThreads = 1;
+    private int metadataStoreSerDesThreads = 
Runtime.getRuntime().availableProcessors();
 ```
 
-Use 1 as the default value since the serialization and deserialization tasks 
are not frequent. This separated thread pool is mainly added to avoid blocking 
the metadata store callback thread.
-
 ### Metrics
 
 The `pulsar_batch_metadata_store_executor_queue_size` metric will be removed 
because the `<name>-batch-flusher` thread won't execute other tasks except for 
flushing.
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
index 4ef1c594be4..878bbc4d654 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java
@@ -28,7 +28,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
 import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
@@ -58,8 +57,6 @@ public class IsolatedBookieEnsemblePlacementPolicy extends 
RackawareEnsemblePlac
     // the secondary group.
     private ImmutablePair<Set<String>, Set<String>> defaultIsolationGroups;
 
-    @Getter
-    @VisibleForTesting
     private MetadataCache<BookiesRackConfiguration> bookieMappingCache;
 
     private static final String PULSAR_SYSTEM_TOPIC_ISOLATION_GROUP = "*";
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index dede4543fc3..f7e50836583 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -490,12 +490,6 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     )
     private boolean metadataStoreAllowReadOnlyOperations;
 
-    @FieldContext(
-            category = CATEGORY_SERVER,
-            doc = "The number of threads used for serializing and 
deserializing data to and from the metadata store"
-    )
-    private int metadataStoreSerDesThreads = 1;
-
     @Deprecated
     @FieldContext(
         category = CATEGORY_SERVER,
diff --git 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
index 936b04386ff..68f92ab416d 100644
--- 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
+++ 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java
@@ -22,15 +22,12 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNotEquals;
-import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Sets;
 import io.netty.util.HashedWheelTimer;
 import java.nio.charset.StandardCharsets;
-import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -291,7 +288,8 @@ public class IsolatedBookieEnsemblePlacementPolicyTest {
         secondaryBookieGroup.put(BOOKIE4, 
BookieInfo.builder().rack("rack0").build());
         bookieMapping.put("group2", secondaryBookieGroup);
 
-        updateBookieInfo(isolationPolicy, 
jsonMapper.writeValueAsBytes(bookieMapping));
+        store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, 
jsonMapper.writeValueAsBytes(bookieMapping),
+                Optional.empty()).join();
 
         ensemble = isolationPolicy.newEnsemble(2, 2, 2, Collections.emptyMap(),
                 null).getResult();
@@ -342,7 +340,8 @@ public class IsolatedBookieEnsemblePlacementPolicyTest {
                 + "\": {\"rack\": \"rack0\", \"hostname\": 
\"bookie3.example.com\"}, \"" + BOOKIE4
                 + "\": {\"rack\": \"rack2\", \"hostname\": 
\"bookie4.example.com\"}}}";
 
-        updateBookieInfo(isolationPolicy, 
data.getBytes(StandardCharsets.UTF_8));
+        store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, 
data.getBytes(StandardCharsets.UTF_8),
+                Optional.empty()).join();
 
         List<BookieId> ensemble = isolationPolicy.newEnsemble(2, 2, 2, 
Collections.emptyMap(),
                 new HashSet<>()).getResult();
@@ -400,7 +399,8 @@ public class IsolatedBookieEnsemblePlacementPolicyTest {
         bookieMapping.put("group1", mainBookieGroup);
         bookieMapping.put("group2", secondaryBookieGroup);
 
-        updateBookieInfo(isolationPolicy, 
jsonMapper.writeValueAsBytes(bookieMapping));
+        store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, 
jsonMapper.writeValueAsBytes(bookieMapping),
+                Optional.empty()).join();
 
         ensemble = isolationPolicy.newEnsemble(3, 3, 3, Collections.emptyMap(),
                 new HashSet<>()).getResult();
@@ -784,7 +784,8 @@ public class IsolatedBookieEnsemblePlacementPolicyTest {
         bookieMapping.put(isolationGroup2, group2);
         bookieMapping.put(isolationGroup3, group3);
 
-        updateBookieInfo(isolationPolicy, 
jsonMapper.writeValueAsBytes(bookieMapping));
+        store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, 
jsonMapper.writeValueAsBytes(bookieMapping),
+                Optional.empty()).join();
 
         groups.setLeft(Sets.newHashSet(isolationGroup1, isolationGroup3));
         groups.setRight(Sets.newHashSet(""));
@@ -807,7 +808,8 @@ public class IsolatedBookieEnsemblePlacementPolicyTest {
         bookieMapping.put(isolationGroup1, group1);
         bookieMapping.put(isolationGroup2, group2);
 
-        updateBookieInfo(isolationPolicy, 
jsonMapper.writeValueAsBytes(bookieMapping));
+        store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, 
jsonMapper.writeValueAsBytes(bookieMapping),
+                Optional.empty()).join();
 
         groups.setLeft(Sets.newHashSet(isolationGroup1));
         groups.setRight(Sets.newHashSet(isolationGroup2));
@@ -829,24 +831,12 @@ public class IsolatedBookieEnsemblePlacementPolicyTest {
         bookieMapping.put(isolationGroup1, group1);
         bookieMapping.put(isolationGroup2, group2);
 
-        updateBookieInfo(isolationPolicy, 
jsonMapper.writeValueAsBytes(bookieMapping));
+        store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, 
jsonMapper.writeValueAsBytes(bookieMapping),
+                Optional.empty()).join();
 
         groups.setLeft(Sets.newHashSet(isolationGroup1));
         groups.setRight(Sets.newHashSet(isolationGroup2));
         blacklist = isolationPolicy.getExcludedBookiesWithIsolationGroups(2, 
groups);
         assertTrue(blacklist.isEmpty());
     }
-
-    // The policy gets the bookie info asynchronously before each query or 
update, when putting the bookie info into
-    // the metadata store, the cache needs some time to receive the 
notification and update accordingly.
-    private void updateBookieInfo(IsolatedBookieEnsemblePlacementPolicy 
isolationPolicy, byte[] bookieInfo) {
-        final var cache = isolationPolicy.getBookieMappingCache();
-        assertNotNull(cache); // the policy must have been initialized
-
-        final var key = BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH;
-        final var previousBookieInfo = cache.getIfCached(key);
-        store.put(key, bookieInfo, Optional.empty()).join();
-        Awaitility.await().atMost(Duration.ofSeconds(1)).untilAsserted(() ->
-                assertNotEquals(cache.getIfCached(key), previousBookieInfo));
-    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index d4b69f8d5fb..56949873621 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -429,7 +429,6 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                         .synchronizer(synchronizer)
                         .openTelemetry(openTelemetry)
                         .nodeSizeStats(new DefaultMetadataNodeSizeStats())
-                        
.numSerDesThreads(config.getMetadataStoreSerDesThreads())
                         .build());
     }
 
@@ -1300,7 +1299,6 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                         .metadataStoreName(MetadataStoreConfig.METADATA_STORE)
                         .openTelemetry(openTelemetry)
                         .nodeSizeStats(new DefaultMetadataNodeSizeStats())
-                        
.numSerDesThreads(config.getMetadataStoreSerDesThreads())
                         .build());
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
index 6195e9cdae5..6c04889d8f1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
@@ -25,15 +25,10 @@ import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import static org.testng.AssertJUnit.assertSame;
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
-import java.util.function.BiConsumer;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -43,10 +38,6 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.apache.pulsar.functions.worker.WorkerService;
-import org.apache.pulsar.metadata.api.MetadataCacheConfig;
-import org.apache.pulsar.metadata.api.MetadataSerde;
-import org.apache.pulsar.metadata.api.MetadataStore;
-import org.apache.pulsar.metadata.api.Stat;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.Test;
 
@@ -348,60 +339,4 @@ public class PulsarServiceTest extends 
MockedPulsarServiceBaseTest {
             assertTrue(e instanceof PulsarClientException.TimeoutException);
         }
     }
-
-    @Test
-    public void testMetadataSerDesThreads() throws Exception {
-        final var numSerDesThreads = 5;
-        final var config = new ServiceConfiguration();
-        config.setMetadataStoreSerDesThreads(numSerDesThreads);
-        config.setClusterName("test");
-        config.setMetadataStoreUrl("memory:local");
-        config.setConfigurationMetadataStoreUrl("memory:local");
-
-        @Cleanup final var pulsar = new PulsarService(config);
-        pulsar.start();
-
-        BiConsumer<MetadataStore, String> verifier = (store, prefix) -> {
-            final var serDes = new CustomMetadataSerDes();
-            final var cache = store.getMetadataCache(prefix, serDes, 
MetadataCacheConfig.builder().build());
-            for (int i = 0; i < 100 && 
serDes.threadNameToSerializedPaths.size() < numSerDesThreads; i++) {
-                cache.create(prefix + i, "value-" + i).join();
-                final var value = cache.get(prefix + i).join();
-                assertEquals(value.orElseThrow(), "value-" + i);
-                final var newValue = cache.readModifyUpdate(prefix + i, s -> s 
+ "-updated").join();
-                assertEquals(newValue, "value-" + i + "-updated");
-                // Verify the serialization and deserialization are handled by 
the same thread
-                assertEquals(serDes.threadNameToSerializedPaths, 
serDes.threadNameToDeserializedPaths);
-            }
-            log.info("SerDes thread mapping: {}", 
serDes.threadNameToSerializedPaths);
-            assertEquals(serDes.threadNameToSerializedPaths.keySet().size(), 
numSerDesThreads);
-            // Verify a path cannot be handled by multiple threads
-            final var paths = 
serDes.threadNameToSerializedPaths.values().stream()
-                .flatMap(Set::stream).sorted().toList();
-            assertEquals(paths.stream().distinct().toList(), paths);
-        };
-
-        verifier.accept(pulsar.getLocalMetadataStore(), "/test-local/");
-        verifier.accept(pulsar.getConfigurationMetadataStore(), 
"/test-config/");
-    }
-
-    private static class CustomMetadataSerDes implements MetadataSerde<String> 
{
-
-        final Map<String, Set<String>> threadNameToSerializedPaths = new 
ConcurrentHashMap<>();
-        final Map<String, Set<String>> threadNameToDeserializedPaths = new 
ConcurrentHashMap<>();
-
-        @Override
-        public byte[] serialize(String path, String value) throws IOException{
-            
threadNameToSerializedPaths.computeIfAbsent(Thread.currentThread().getName(),
-                    __ -> ConcurrentHashMap.newKeySet()).add(path);
-            return value.getBytes();
-        }
-
-        @Override
-        public String deserialize(String path, byte[] data, Stat stat) throws 
IOException {
-            
threadNameToDeserializedPaths.computeIfAbsent(Thread.currentThread().getName(),
-                    __ -> ConcurrentHashMap.newKeySet()).add(path);
-            return new String(data);
-        }
-    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryMetadataStoreStatsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryMetadataStoreStatsTest.java
index 390aa1e49e2..9e8bde20b88 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryMetadataStoreStatsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryMetadataStoreStatsTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.stats;
 import static 
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
 import static org.assertj.core.api.Assertions.assertThat;
 import io.opentelemetry.api.common.Attributes;
+import java.util.concurrent.ExecutorService;
 import lombok.Cleanup;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.pulsar.broker.BrokerTestUtil;
@@ -28,6 +29,7 @@ import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.broker.testcontext.NonClosingProxyHandler;
 import org.apache.pulsar.broker.testcontext.PulsarTestContext;
 import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.impl.stats.BatchMetadataStoreStats;
 import org.apache.pulsar.metadata.impl.stats.MetadataStoreStats;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -51,6 +53,14 @@ public class OpenTelemetryMetadataStoreStatsTest extends 
BrokerTestBase {
         var newStats = new MetadataStoreStats(
                 localMetadataStoreName, 
pulsar.getOpenTelemetry().getOpenTelemetryService().getOpenTelemetry());
         FieldUtils.writeField(localMetadataStore, "metadataStoreStats", 
newStats, true);
+
+        var currentBatchedStats = (BatchMetadataStoreStats) 
FieldUtils.readField(localMetadataStore,
+                "batchMetadataStoreStats", true);
+        currentBatchedStats.close();
+        var currentExecutor = (ExecutorService) 
FieldUtils.readField(currentBatchedStats, "executor", true);
+        var newBatchedStats = new 
BatchMetadataStoreStats(localMetadataStoreName, currentExecutor,
+                
pulsar.getOpenTelemetry().getOpenTelemetryService().getOpenTelemetry());
+        FieldUtils.writeField(localMetadataStore, "batchMetadataStoreStats", 
newBatchedStats, true);
     }
 
     @AfterMethod(alwaysRun = true)
@@ -79,5 +89,7 @@ public class OpenTelemetryMetadataStoreStatsTest extends 
BrokerTestBase {
         var metrics = 
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
         assertMetricLongSumValue(metrics, 
MetadataStoreStats.METADATA_STORE_PUT_BYTES_COUNTER_METRIC_NAME,
                 attributes, value -> assertThat(value).isPositive());
+        assertMetricLongSumValue(metrics, 
BatchMetadataStoreStats.EXECUTOR_QUEUE_SIZE_METRIC_NAME, attributes,
+                value -> assertThat(value).isPositive());
     }
 }
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
index fcde0dce840..ef50dc87691 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
@@ -104,7 +104,4 @@ public class MetadataStoreConfig {
      * The estimator to estimate the payload length of metadata node, which 
used to limit the batch size requested.
      */
     private MetadataNodeSizeStats nodeSizeStats;
-
-    @Builder.Default
-    private final int numSerDesThreads = 1;
 }
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
index ca165f0464e..b1f0572547c 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
@@ -39,10 +39,10 @@ import java.util.function.Supplier;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
-import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.pulsar.common.stats.CacheMetricsCollector;
 import org.apache.pulsar.common.util.Backoff;
 import org.apache.pulsar.metadata.api.CacheGetResult;
+import org.apache.pulsar.metadata.api.GetResult;
 import org.apache.pulsar.metadata.api.MetadataCache;
 import org.apache.pulsar.metadata.api.MetadataCacheConfig;
 import org.apache.pulsar.metadata.api.MetadataSerde;
@@ -62,26 +62,23 @@ public class MetadataCacheImpl<T> implements 
MetadataCache<T>, Consumer<Notifica
     private final MetadataStore store;
     private final MetadataStoreExtended storeExtended;
     private final MetadataSerde<T> serde;
-    private final OrderedExecutor executor;
-    private final ScheduledExecutorService schedulerExecutor;
+    private final ScheduledExecutorService executor;
     private final MetadataCacheConfig<T> cacheConfig;
 
     private final AsyncLoadingCache<String, Optional<CacheGetResult<T>>> 
objCache;
 
     public MetadataCacheImpl(String cacheName, MetadataStore store, 
TypeReference<T> typeRef,
-                             MetadataCacheConfig<T> cacheConfig, 
OrderedExecutor executor,
-                             ScheduledExecutorService schedulerExecutor) {
-        this(cacheName, store, new JSONMetadataSerdeTypeRef<>(typeRef), 
cacheConfig, executor, schedulerExecutor);
+                             MetadataCacheConfig<T> cacheConfig, 
ScheduledExecutorService executor) {
+        this(cacheName, store, new JSONMetadataSerdeTypeRef<>(typeRef), 
cacheConfig, executor);
     }
 
     public MetadataCacheImpl(String cacheName, MetadataStore store, JavaType 
type, MetadataCacheConfig<T> cacheConfig,
-                             OrderedExecutor executor, 
ScheduledExecutorService schedulerExecutor) {
-        this(cacheName, store, new JSONMetadataSerdeSimpleType<>(type), 
cacheConfig, executor, schedulerExecutor);
+                             ScheduledExecutorService executor) {
+        this(cacheName, store, new JSONMetadataSerdeSimpleType<>(type), 
cacheConfig, executor);
     }
 
     public MetadataCacheImpl(String cacheName, MetadataStore store, 
MetadataSerde<T> serde,
-                             MetadataCacheConfig<T> cacheConfig, 
OrderedExecutor executor,
-                             ScheduledExecutorService schedulerExecutor) {
+                             MetadataCacheConfig<T> cacheConfig, 
ScheduledExecutorService executor) {
         this.store = store;
         if (store instanceof MetadataStoreExtended) {
             this.storeExtended = (MetadataStoreExtended) store;
@@ -91,7 +88,6 @@ public class MetadataCacheImpl<T> implements 
MetadataCache<T>, Consumer<Notifica
         this.serde = serde;
         this.cacheConfig = cacheConfig;
         this.executor = executor;
-        this.schedulerExecutor = schedulerExecutor;
 
         Caffeine<Object, Object> cacheBuilder = Caffeine.newBuilder();
         if (cacheConfig.getRefreshAfterWriteMillis() > 0) {
@@ -105,9 +101,6 @@ public class MetadataCacheImpl<T> implements 
MetadataCache<T>, Consumer<Notifica
                 .buildAsync(new AsyncCacheLoader<String, 
Optional<CacheGetResult<T>>>() {
                     @Override
                     public CompletableFuture<Optional<CacheGetResult<T>>> 
asyncLoad(String key, Executor executor) {
-                        if (log.isDebugEnabled()) {
-                            log.debug("Loading key {} into metadata cache {}", 
key, cacheName);
-                        }
                         return readValueFromStore(key);
                     }
 
@@ -117,16 +110,12 @@ public class MetadataCacheImpl<T> implements 
MetadataCache<T>, Consumer<Notifica
                             Optional<CacheGetResult<T>> oldValue,
                             Executor executor) {
                         if (store instanceof AbstractMetadataStore && 
((AbstractMetadataStore) store).isConnected()) {
-                            if (log.isDebugEnabled()) {
-                                log.debug("Reloading key {} into metadata 
cache {}", key, cacheName);
-                            }
-                            final var future = readValueFromStore(key);
-                            future.thenAccept(val -> {
+                            return readValueFromStore(key).thenApply(val -> {
                                 if (cacheConfig.getAsyncReloadConsumer() != 
null) {
                                     
cacheConfig.getAsyncReloadConsumer().accept(key, val);
                                 }
+                                return val;
                             });
-                            return future;
                         } else {
                             // Do not try to refresh the cache item if we know 
that we're not connected to the
                             // metadata store
@@ -139,46 +128,22 @@ public class MetadataCacheImpl<T> implements 
MetadataCache<T>, Consumer<Notifica
     }
 
     private CompletableFuture<Optional<CacheGetResult<T>>> 
readValueFromStore(String path) {
-        final var future = new 
CompletableFuture<Optional<CacheGetResult<T>>>();
-        store.get(path).thenComposeAsync(optRes -> {
-            // There could be multiple pending reads for the same path, for 
example, when a path is created,
-            // 1. The `accept` method will call `refresh`
-            // 2. The `put` method will call `refresh` after the metadata 
store put operation is done
-            // Both will call this method and the same result will be read. In 
this case, we only need to deserialize
-            // the value once.
-            if (!optRes.isPresent()) {
-                if (log.isDebugEnabled()) {
-                    log.debug("Key {} not found in metadata store", path);
-                }
-                return FutureUtils.value(Optional.<CacheGetResult<T>>empty());
-            }
-            final var res = optRes.get();
-            final var cachedFuture = objCache.getIfPresent(path);
-            if (cachedFuture != null && cachedFuture != future) {
-                if (log.isDebugEnabled()) {
-                    log.debug("A new read on key {} is in progress or 
completed, ignore this one", path);
-                }
-                return cachedFuture;
-            }
-            try {
-                T obj = serde.deserialize(path, res.getValue(), res.getStat());
-                if (log.isDebugEnabled()) {
-                    log.debug("Deserialized value for key {} (version: {}): 
{}", path, res.getStat().getVersion(),
-                        obj);
-                }
-                return FutureUtils.value(Optional.of(new CacheGetResult<>(obj, 
res.getStat())));
-            } catch (Throwable t) {
-                return FutureUtils.exception(new 
ContentDeserializationException(
-                    "Failed to deserialize payload for key '" + path + "'", 
t));
-            }
-        }, executor.chooseThread(path)).whenComplete((result, e) -> {
-            if (e != null) {
-                future.completeExceptionally(e.getCause());
-            } else {
-                future.complete(result);
-            }
-        });
-        return future;
+        return store.get(path)
+                .thenCompose(optRes -> {
+                    if (!optRes.isPresent()) {
+                        return FutureUtils.value(Optional.empty());
+                    }
+
+                    try {
+                        GetResult res = optRes.get();
+                        T obj = serde.deserialize(path, res.getValue(), 
res.getStat());
+                        return FutureUtils
+                                .value(Optional.of(new CacheGetResult<>(obj, 
res.getStat())));
+                    } catch (Throwable t) {
+                        return FutureUtils.exception(new 
ContentDeserializationException(
+                                "Failed to deserialize payload for key '" + 
path + "'", t));
+                    }
+                });
     }
 
     @Override
@@ -204,9 +169,8 @@ public class MetadataCacheImpl<T> implements 
MetadataCache<T>, Consumer<Notifica
 
     @Override
     public CompletableFuture<T> readModifyUpdateOrCreate(String path, 
Function<Optional<T>, T> modifyFunction) {
-        final var executor = this.executor.chooseThread(path);
         return executeWithRetry(() -> objCache.get(path)
-                .thenComposeAsync(optEntry -> {
+                .thenCompose(optEntry -> {
                     Optional<T> currentValue;
                     long expectedVersion;
 
@@ -238,14 +202,13 @@ public class MetadataCacheImpl<T> implements 
MetadataCache<T>, Consumer<Notifica
                     return store.put(path, newValue, 
Optional.of(expectedVersion)).thenAccept(__ -> {
                         refresh(path);
                     }).thenApply(__ -> newValueObj);
-                }, executor), path);
+                }), path);
     }
 
     @Override
     public CompletableFuture<T> readModifyUpdate(String path, Function<T, T> 
modifyFunction) {
-        final var executor = this.executor.chooseThread(path);
         return executeWithRetry(() -> objCache.get(path)
-                .thenComposeAsync(optEntry -> {
+                .thenCompose(optEntry -> {
                     if (!optEntry.isPresent()) {
                         return FutureUtils.exception(new 
NotFoundException(""));
                     }
@@ -268,57 +231,59 @@ public class MetadataCacheImpl<T> implements 
MetadataCache<T>, Consumer<Notifica
                     return store.put(path, newValue, 
Optional.of(expectedVersion)).thenAccept(__ -> {
                         refresh(path);
                     }).thenApply(__ -> newValueObj);
-                }, executor), path);
-    }
-
-    private CompletableFuture<byte[]> serialize(String path, T value) {
-        final var future = new CompletableFuture<byte[]>();
-        executor.executeOrdered(path, () -> {
-            try {
-                future.complete(serde.serialize(path, value));
-            } catch (Throwable t) {
-                future.completeExceptionally(t);
-            }
-        });
-        return future;
+                }), path);
     }
 
     @Override
     public CompletableFuture<Void> create(String path, T value) {
-        final var future = new CompletableFuture<Void>();
-        serialize(path, value).thenCompose(content -> store.put(path, content, 
Optional.of(-1L)))
-            // Make sure we have the value cached before the operation is 
completed
-            // In addition to caching the value, we need to add a watch on the 
path,
-            // so when/if it changes on any other node, we are notified and we 
can
-            // update the cache
-            .thenCompose(__ -> objCache.get(path))
-            .whenComplete((__, ex) -> {
-                if (ex == null) {
-                    future.complete(null);
-                } else if (ex.getCause() instanceof BadVersionException) {
-                    // Use already exists exception to provide more 
self-explanatory error message
-                    future.completeExceptionally(new 
AlreadyExistsException(ex.getCause()));
-                } else {
-                    future.completeExceptionally(ex.getCause());
-                }
-            });
+        byte[] content;
+        try {
+            content = serde.serialize(path, value);
+        } catch (Throwable t) {
+            return FutureUtils.exception(t);
+        }
+
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        store.put(path, content, Optional.of(-1L))
+                .thenAccept(stat -> {
+                    // Make sure we have the value cached before the operation 
is completed
+                    // In addition to caching the value, we need to add a 
watch on the path,
+                    // so when/if it changes on any other node, we are 
notified and we can
+                    // update the cache
+                    objCache.get(path).whenComplete((stat2, ex) -> {
+                        if (ex == null) {
+                            future.complete(null);
+                        } else {
+                            log.error("Exception while getting path {}", path, 
ex);
+                            future.completeExceptionally(ex.getCause());
+                        }
+                    });
+                }).exceptionally(ex -> {
+                    if (ex.getCause() instanceof BadVersionException) {
+                        // Use already exists exception to provide more 
self-explanatory error message
+                        future.completeExceptionally(new 
AlreadyExistsException(ex.getCause()));
+                    } else {
+                        future.completeExceptionally(ex.getCause());
+                    }
+                    return null;
+                });
+
         return future;
     }
 
     @Override
     public CompletableFuture<Void> put(String path, T value, 
EnumSet<CreateOption> options) {
-        return serialize(path, value).thenCompose(bytes -> {
-            if (storeExtended != null) {
-                return storeExtended.put(path, bytes, Optional.empty(), 
options);
-            } else {
-                return store.put(path, bytes, Optional.empty());
-            }
-        }).thenAccept(__ -> {
-            if (log.isDebugEnabled()) {
-                log.debug("Refreshing path {} after put operation", path);
-            }
-            refresh(path);
-        });
+        final byte[] bytes;
+        try {
+            bytes = serde.serialize(path, value);
+        } catch (IOException e) {
+            return CompletableFuture.failedFuture(e);
+        }
+        if (storeExtended != null) {
+            return storeExtended.put(path, bytes, Optional.empty(), 
options).thenAccept(__ -> refresh(path));
+        } else {
+            return store.put(path, bytes, Optional.empty()).thenAccept(__ -> 
refresh(path));
+        }
     }
 
     @Override
@@ -358,9 +323,6 @@ public class MetadataCacheImpl<T> implements 
MetadataCache<T>, Consumer<Notifica
         switch (t.getType()) {
         case Created:
         case Modified:
-            if (log.isDebugEnabled()) {
-                log.debug("Refreshing path {} for {} notification", path, 
t.getType());
-            }
             refresh(path);
             break;
 
@@ -392,7 +354,8 @@ public class MetadataCacheImpl<T> implements 
MetadataCache<T>, Consumer<Notifica
                 final var next = backoff.next();
                 log.info("Update key {} conflicts. Retrying in {} ms. 
Mandatory stop: {}. Elapsed time: {} ms", key,
                         next, backoff.isMandatoryStopMade(), elapsed);
-                schedulerExecutor.schedule(() -> execute(op, key, result, 
backoff), next, TimeUnit.MILLISECONDS);
+                executor.schedule(() -> execute(op, key, result, backoff), 
next,
+                        TimeUnit.MILLISECONDS);
                 return null;
             }
             result.completeExceptionally(ex.getCause());
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index d118a792e2f..b0e4b43f700 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -26,7 +26,6 @@ import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.LoadingCache;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.MoreExecutors;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import io.opentelemetry.api.OpenTelemetry;
 import java.time.Instant;
@@ -39,17 +38,16 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.common.stats.CacheMetricsCollector;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -78,9 +76,7 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
     private final CopyOnWriteArrayList<Consumer<Notification>> listeners = new 
CopyOnWriteArrayList<>();
     private final CopyOnWriteArrayList<Consumer<SessionEvent>> 
sessionListeners = new CopyOnWriteArrayList<>();
     protected final String metadataStoreName;
-    private final OrderedExecutor serDesExecutor;
-    private final ExecutorService eventExecutor;
-    private final ScheduledExecutorService schedulerExecutor;
+    protected final ScheduledExecutorService executor;
     private final AsyncLoadingCache<String, List<String>> childrenCache;
     private final AsyncLoadingCache<String, Boolean> existsCache;
     private final CopyOnWriteArrayList<MetadataCacheImpl<?>> metadataCaches = 
new CopyOnWriteArrayList<>();
@@ -97,21 +93,13 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
 
     protected MetadataNodeSizeStats nodeSizeStats;
 
-    protected AbstractMetadataStore(
-            String metadataStoreName, OpenTelemetry openTelemetry, 
MetadataNodeSizeStats nodeSizeStats,
-            int numSerDesThreads) {
+    protected AbstractMetadataStore(String metadataStoreName, OpenTelemetry 
openTelemetry,
+                MetadataNodeSizeStats nodeSizeStats) {
         this.nodeSizeStats = nodeSizeStats == null ? new 
DummyMetadataNodeSizeStats()
                 : nodeSizeStats;
-        final var namePrefix = StringUtils.isNotBlank(metadataStoreName) ? 
metadataStoreName
-                : getClass().getSimpleName();
-        this.eventExecutor = Executors.newSingleThreadExecutor(
-                new DefaultThreadFactory(namePrefix + "-event"));
-        this.schedulerExecutor = Executors.newSingleThreadScheduledExecutor(
-                new DefaultThreadFactory(namePrefix + "-scheduler"));
-        this.serDesExecutor = OrderedExecutor.newBuilder()
-                .numThreads(numSerDesThreads)
-                .name(namePrefix + "-worker")
-                .build();
+        this.executor = new ScheduledThreadPoolExecutor(1,
+                new DefaultThreadFactory(
+                        StringUtils.isNotBlank(metadataStoreName) ? 
metadataStoreName : getClass().getSimpleName()));
         registerListener(this);
 
         this.childrenCache = Caffeine.newBuilder()
@@ -261,8 +249,7 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
         JavaType typeRef = 
TypeFactory.defaultInstance().constructSimpleType(clazz, null);
         String cacheName = StringUtils.isNotBlank(metadataStoreName) ? 
metadataStoreName : getClass().getSimpleName();
         MetadataCacheImpl<T> metadataCache =
-                new MetadataCacheImpl<T>(cacheName, this, typeRef, 
cacheConfig, this.serDesExecutor,
-                        this.schedulerExecutor);
+                new MetadataCacheImpl<T>(cacheName, this, typeRef, 
cacheConfig, this.executor);
         metadataCaches.add(metadataCache);
         return metadataCache;
     }
@@ -271,8 +258,7 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
     public <T> MetadataCache<T> getMetadataCache(TypeReference<T> typeRef, 
MetadataCacheConfig cacheConfig) {
         String cacheName = StringUtils.isNotBlank(metadataStoreName) ? 
metadataStoreName : getClass().getSimpleName();
         MetadataCacheImpl<T> metadataCache =
-                new MetadataCacheImpl<T>(cacheName, this, typeRef, 
cacheConfig, this.serDesExecutor,
-                        this.schedulerExecutor);
+                new MetadataCacheImpl<T>(cacheName, this, typeRef, 
cacheConfig, this.executor);
         metadataCaches.add(metadataCache);
         return metadataCache;
     }
@@ -282,7 +268,7 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
                                                  MetadataCacheConfig 
cacheConfig) {
         MetadataCacheImpl<T> metadataCache =
                 new MetadataCacheImpl<>(cacheName, this, serde, cacheConfig,
-                        this.serDesExecutor, this.schedulerExecutor);
+                        this.executor);
         metadataCaches.add(metadataCache);
         return metadataCache;
     }
@@ -362,7 +348,7 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
                 });
 
                 return null;
-            }, eventExecutor);
+            }, executor);
         } catch (RejectedExecutionException e) {
             return FutureUtil.failedFuture(e);
         }
@@ -545,7 +531,7 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
 
         // Notice listeners.
         try {
-            eventExecutor.execute(() -> {
+            executor.execute(() -> {
                 sessionListeners.forEach(l -> {
                     try {
                         l.accept(event);
@@ -570,9 +556,8 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
 
     @Override
     public void close() throws Exception {
-        MoreExecutors.shutdownAndAwaitTermination(serDesExecutor, 10, 
TimeUnit.SECONDS);
-        MoreExecutors.shutdownAndAwaitTermination(schedulerExecutor, 10, 
TimeUnit.SECONDS);
-        MoreExecutors.shutdownAndAwaitTermination(eventExecutor, 10, 
TimeUnit.SECONDS);
+        executor.shutdownNow();
+        executor.awaitTermination(10, TimeUnit.SECONDS);
         this.metadataStoreStats.close();
     }
 
@@ -589,30 +574,30 @@ public abstract class AbstractMetadataStore implements 
MetadataStoreExtended, Co
         }
     }
 
-    protected final <T> void processEvent(Consumer<T> eventProcessor, T event) 
{
+    /**
+     * Run the task in the executor thread and fail the future if the executor 
is shutting down.
+     */
+    @VisibleForTesting
+    public void execute(Runnable task, CompletableFuture<?> future) {
         try {
-            eventExecutor.execute(() -> eventProcessor.accept(event));
-        } catch (RejectedExecutionException e) {
-            log.warn("Rejected processing event {}", event);
+            executor.execute(task);
+        } catch (Throwable t) {
+            future.completeExceptionally(t);
         }
     }
 
-    protected final void scheduleDelayedTask(long delay, TimeUnit unit, 
Runnable task) {
-        schedulerExecutor.schedule(task, delay, unit);
-    }
-
-    protected final void safeExecuteCallback(Runnable task, 
Consumer<Throwable> exceptionHandler) {
+    /**
+     * Run the task in the executor thread and fail the future if the executor 
is shutting down.
+     */
+    @VisibleForTesting
+    public void execute(Runnable task, Supplier<List<CompletableFuture<?>>> 
futures) {
         try {
-            eventExecutor.execute(task);
-        } catch (Throwable t) {
-            exceptionHandler.accept(t);
+            executor.execute(task);
+        } catch (final Throwable t) {
+            futures.get().forEach(f -> f.completeExceptionally(t));
         }
     }
 
-    protected final void safeExecuteCallback(Runnable task, 
CompletableFuture<?> future) {
-        safeExecuteCallback(task, future::completeExceptionally);
-    }
-
     protected static String parent(String path) {
         int idx = path.lastIndexOf('/');
         if (idx <= 0) {
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java
index e1311fccfe0..3937fd712dc 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java
@@ -188,7 +188,7 @@ public class EtcdMetadataStore extends 
AbstractBatchedMetadataStore {
     @Override
     protected CompletableFuture<Boolean> existsFromStore(String path) {
         return kv.get(ByteSequence.from(path, StandardCharsets.UTF_8), 
EXISTS_GET_OPTION)
-                .thenApply(gr -> gr.getCount() == 1);
+                .thenApplyAsync(gr -> gr.getCount() == 1, executor);
     }
 
     @Override
@@ -204,8 +204,9 @@ public class EtcdMetadataStore extends 
AbstractBatchedMetadataStore {
             }
             return super.storePut(parent, new byte[0], Optional.empty(), 
EnumSet.noneOf(CreateOption.class))
                     // Then create the unique key with the version added in 
the path
-                    .thenCompose(
-                            stat -> super.storePut(path + stat.getVersion(), 
data, optExpectedVersion, options));
+                    .thenComposeAsync(
+                            stat -> super.storePut(path + stat.getVersion(), 
data, optExpectedVersion, options),
+                            executor);
         }
     }
 
@@ -312,7 +313,9 @@ public class EtcdMetadataStore extends 
AbstractBatchedMetadataStore {
                     }
                 } else {
                     log.warn("Failed to commit: {}", cause.getMessage());
-                    ops.forEach(o -> o.getFuture().completeExceptionally(ex));
+                    executor.execute(() -> {
+                        ops.forEach(o -> 
o.getFuture().completeExceptionally(ex));
+                    });
                 }
                 return null;
             });
@@ -323,7 +326,7 @@ public class EtcdMetadataStore extends 
AbstractBatchedMetadataStore {
 
     private void handleBatchOperationResult(TxnResponse txnResponse,
                                             List<MetadataOp> ops) {
-        safeExecuteCallbacks(() -> {
+        executor.execute(() -> {
             if (!txnResponse.isSucceeded()) {
                 if (ops.size() > 1) {
                     // Retry individually
@@ -401,7 +404,7 @@ public class EtcdMetadataStore extends 
AbstractBatchedMetadataStore {
                     }
                 }
             }
-        }, ops);
+        });
     }
 
     private synchronized CompletableFuture<Void> createLease(boolean 
retryOnFailure) {
@@ -441,7 +444,9 @@ public class EtcdMetadataStore extends 
AbstractBatchedMetadataStore {
         if (retryOnFailure) {
             future.exceptionally(ex -> {
                 log.warn("Failed to create Etcd lease. Retrying later", ex);
-                scheduleDelayedTask(1, TimeUnit.SECONDS, () -> 
createLease(true));
+                executor.schedule(() -> {
+                    createLease(true);
+                }, 1, TimeUnit.SECONDS);
                 return null;
             });
         }
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
index 627304b2edc..079cb3130e0 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
@@ -79,7 +79,7 @@ public class LocalMemoryMetadataStore extends 
AbstractMetadataStore implements M
     public LocalMemoryMetadataStore(String metadataURL, MetadataStoreConfig 
metadataStoreConfig)
             throws MetadataStoreException {
         super(metadataStoreConfig.getMetadataStoreName(), 
metadataStoreConfig.getOpenTelemetry(),
-                metadataStoreConfig.getNodeSizeStats(), 
metadataStoreConfig.getNumSerDesThreads());
+                metadataStoreConfig.getNodeSizeStats());
         String name = metadataURL.substring(MEMORY_SCHEME_IDENTIFIER.length());
         // Local means a private data set
         // update synchronizer and register sync listener
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
index 08e5478ffcc..74bddda7454 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
@@ -210,7 +210,7 @@ public class RocksdbMetadataStore extends 
AbstractMetadataStore {
     private RocksdbMetadataStore(String metadataURL, MetadataStoreConfig 
metadataStoreConfig)
             throws MetadataStoreException {
         super(metadataStoreConfig.getMetadataStoreName(), 
metadataStoreConfig.getOpenTelemetry(),
-                metadataStoreConfig.getNodeSizeStats(), 
metadataStoreConfig.getNumSerDesThreads());
+                metadataStoreConfig.getNodeSizeStats());
         this.metadataUrl = metadataURL;
         try {
             RocksDB.loadLibrary();
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
index f56d6c6941f..5bf7e2272f0 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
@@ -119,7 +119,7 @@ public class ZKMetadataStore extends 
AbstractBatchedMetadataStore
 
     private void processSessionWatcher(WatchedEvent event) {
         if (sessionWatcher != null) {
-            processEvent(sessionWatcher::process, event);
+            executor.execute(() -> sessionWatcher.process(event));
         }
     }
 
@@ -245,8 +245,9 @@ public class ZKMetadataStore extends 
AbstractBatchedMetadataStore
                                 countsByType, totalSize, opsForLog);
 
                         // Retry with the individual operations
-                        scheduleDelayedTask(100, TimeUnit.MILLISECONDS,
-                            () -> ops.forEach(o -> 
batchOperation(Collections.singletonList(o))));
+                        executor.schedule(() -> {
+                            ops.forEach(o -> 
batchOperation(Collections.singletonList(o)));
+                        }, 100, TimeUnit.MILLISECONDS);
                     } else {
                         MetadataStoreException e = getException(code, path);
                         ops.forEach(o -> 
o.getFuture().completeExceptionally(e));
@@ -255,7 +256,7 @@ public class ZKMetadataStore extends 
AbstractBatchedMetadataStore
                 }
 
                 // Trigger all the futures in the batch
-                safeExecuteCallbacks(() -> {
+                execute(() -> {
                     for (int i = 0; i < ops.size(); i++) {
                         OpResult opr = results.get(i);
                         MetadataOp op = ops.get(i);
@@ -277,7 +278,7 @@ public class ZKMetadataStore extends 
AbstractBatchedMetadataStore
                                             "Operation type not supported in 
multi: " + op.getType()));
                             }
                         }
-                }, ops);
+                }, () -> 
ops.stream().map(MetadataOp::getFuture).collect(Collectors.toList()));
             }, null);
         } catch (Throwable t) {
             ops.forEach(o -> o.getFuture().completeExceptionally(new 
MetadataStoreException(t)));
@@ -394,7 +395,7 @@ public class ZKMetadataStore extends 
AbstractBatchedMetadataStore
 
         try {
             zkc.exists(path, null, (rc, path1, ctx, stat) -> {
-                safeExecuteCallback(() -> {
+                execute(() -> {
                     Code code = Code.get(rc);
                     if (code == Code.OK) {
                         future.complete(true);
@@ -420,7 +421,7 @@ public class ZKMetadataStore extends 
AbstractBatchedMetadataStore
 
         try {
             zkc.delete(op.getPath(), expectedVersion, (rc, path1, ctx) -> {
-                safeExecuteCallback(() -> {
+                execute(() -> {
                     Code code = Code.get(rc);
                     if (code == Code.OK) {
                         future.complete(null);
@@ -445,7 +446,7 @@ public class ZKMetadataStore extends 
AbstractBatchedMetadataStore
                 CreateMode createMode = getCreateMode(opPut.getOptions());
                 asyncCreateFullPathOptimistic(zkc, opPut.getPath(), 
opPut.getData(), createMode,
                         (rc, path1, ctx, name) -> {
-                            safeExecuteCallback(() -> {
+                            execute(() -> {
                                 Code code = Code.get(rc);
                                 if (code == Code.OK) {
                                     future.complete(new Stat(name, 0, 0, 0, 
createMode.isEphemeral(), true));
@@ -459,7 +460,7 @@ public class ZKMetadataStore extends 
AbstractBatchedMetadataStore
                         });
             } else {
                 zkc.setData(opPut.getPath(), opPut.getData(), expectedVersion, 
(rc, path1, ctx, stat) -> {
-                    safeExecuteCallback(() -> {
+                    execute(() -> {
                         Code code = Code.get(rc);
                         if (code == Code.OK) {
                             future.complete(getStat(path1, stat));
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
index 30989a41bd1..a9319a50fec 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
@@ -18,20 +18,16 @@
  */
 package org.apache.pulsar.metadata.impl.batching;
 
-import com.google.common.util.concurrent.MoreExecutors;
-import io.netty.util.concurrent.DefaultThreadFactory;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.metadata.api.GetResult;
 import org.apache.pulsar.metadata.api.MetadataEventSynchronizer;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
@@ -42,7 +38,6 @@ import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
 import org.apache.pulsar.metadata.impl.stats.BatchMetadataStoreStats;
 import org.jctools.queues.MessagePassingQueue;
 import org.jctools.queues.MpscUnboundedArrayQueue;
-import org.jspecify.annotations.Nullable;
 
 @Slf4j
 public abstract class AbstractBatchedMetadataStore extends 
AbstractMetadataStore {
@@ -51,6 +46,8 @@ public abstract class AbstractBatchedMetadataStore extends 
AbstractMetadataStore
     private final MessagePassingQueue<MetadataOp> readOps;
     private final MessagePassingQueue<MetadataOp> writeOps;
 
+    private final AtomicBoolean flushInProgress = new AtomicBoolean(false);
+
     private final boolean enabled;
     private final int maxDelayMillis;
     protected final int maxOperations;
@@ -58,12 +55,9 @@ public abstract class AbstractBatchedMetadataStore extends 
AbstractMetadataStore
     private MetadataEventSynchronizer synchronizer;
     private final BatchMetadataStoreStats batchMetadataStoreStats;
     protected MetadataStoreBatchStrategy metadataStoreBatchStrategy;
-    @Nullable
-    private final ScheduledExecutorService flushExecutor;
 
     protected AbstractBatchedMetadataStore(MetadataStoreConfig conf) {
-        super(conf.getMetadataStoreName(), conf.getOpenTelemetry(), 
conf.getNodeSizeStats(),
-                conf.getNumSerDesThreads());
+        super(conf.getMetadataStoreName(), conf.getOpenTelemetry(), 
conf.getNodeSizeStats());
 
         this.enabled = conf.isBatchingEnabled();
         this.maxDelayMillis = conf.getBatchingMaxDelayMillis();
@@ -73,22 +67,18 @@ public abstract class AbstractBatchedMetadataStore extends 
AbstractMetadataStore
         if (enabled) {
             readOps = new MpscUnboundedArrayQueue<>(10_000);
             writeOps = new MpscUnboundedArrayQueue<>(10_000);
-            final var name = 
StringUtils.isNotBlank(conf.getMetadataStoreName()) ? 
conf.getMetadataStoreName()
-                    : getClass().getSimpleName();
-            flushExecutor = Executors.newSingleThreadScheduledExecutor(new 
DefaultThreadFactory(
-                    name + "-batch-flusher"));
-            scheduledTask = flushExecutor.scheduleAtFixedRate(this::flush, 
maxDelayMillis, maxDelayMillis,
-                    TimeUnit.MILLISECONDS);
+            scheduledTask =
+                    executor.scheduleAtFixedRate(this::flush, maxDelayMillis, 
maxDelayMillis, TimeUnit.MILLISECONDS);
         } else {
             scheduledTask = null;
             readOps = null;
             writeOps = null;
-            flushExecutor = null;
         }
 
         // update synchronizer and register sync listener
         updateMetadataEventSynchronizer(conf.getSynchronizer());
-        this.batchMetadataStoreStats = new 
BatchMetadataStoreStats(metadataStoreName);
+        this.batchMetadataStoreStats =
+                new BatchMetadataStoreStats(metadataStoreName, executor, 
conf.getOpenTelemetry());
         this.metadataStoreBatchStrategy = new 
DefaultMetadataStoreBatchStrategy(maxOperations, maxSize);
     }
 
@@ -106,13 +96,12 @@ public abstract class AbstractBatchedMetadataStore extends 
AbstractMetadataStore
                 op.getFuture().completeExceptionally(ex);
             }
             scheduledTask.cancel(true);
-            MoreExecutors.shutdownAndAwaitTermination(flushExecutor, 10, 
TimeUnit.SECONDS);
         }
         super.close();
         this.batchMetadataStoreStats.close();
     }
 
-    private synchronized void flush() {
+    private void flush() {
         List<MetadataOp> currentBatch;
         if (!readOps.isEmpty()) {
             while (CollectionUtils.isNotEmpty(currentBatch = 
metadataStoreBatchStrategy.nextBatch(readOps))) {
@@ -124,6 +113,8 @@ public abstract class AbstractBatchedMetadataStore extends 
AbstractMetadataStore
                 internalBatchOperation(currentBatch);
             }
         }
+
+        flushInProgress.set(false);
     }
 
     @Override
@@ -178,8 +169,8 @@ public abstract class AbstractBatchedMetadataStore extends 
AbstractMetadataStore
                 internalBatchOperation(Collections.singletonList(op));
                 return;
             }
-            if (queue.size() > maxOperations) {
-                flush();
+            if (queue.size() > maxOperations && 
flushInProgress.compareAndSet(false, true)) {
+                executor.execute(this::flush);
             }
         } else {
             internalBatchOperation(Collections.singletonList(op));
@@ -203,8 +194,4 @@ public abstract class AbstractBatchedMetadataStore extends 
AbstractMetadataStore
     }
 
     protected abstract void batchOperation(List<MetadataOp> ops);
-
-    protected final void safeExecuteCallbacks(Runnable runnable, 
List<MetadataOp> ops) {
-        safeExecuteCallback(runnable, t -> ops.forEach(op -> 
op.getFuture().completeExceptionally(t)));
-    }
 }
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
index 407a927bda4..d055dd7da55 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
@@ -61,7 +61,7 @@ public class OxiaMetadataStore extends AbstractMetadataStore {
     private Optional<MetadataEventSynchronizer> synchronizer;
 
     public OxiaMetadataStore(AsyncOxiaClient oxia, String identity) {
-        super("oxia-metadata", OpenTelemetry.noop(), null, 1);
+        super("oxia-metadata", OpenTelemetry.noop(), null);
         this.client = oxia;
         this.identity = identity;
         this.synchronizer = Optional.empty();
@@ -75,7 +75,7 @@ public class OxiaMetadataStore extends AbstractMetadataStore {
             boolean enableSessionWatcher)
             throws Exception {
         super("oxia-metadata", 
Objects.requireNonNull(metadataStoreConfig).getOpenTelemetry(),
-                metadataStoreConfig.getNodeSizeStats(), 
metadataStoreConfig.getNumSerDesThreads());
+                metadataStoreConfig.getNodeSizeStats());
 
         var linger = metadataStoreConfig.getBatchingMaxDelayMillis();
         if (!metadataStoreConfig.isBatchingEnabled()) {
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/BatchMetadataStoreStats.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/BatchMetadataStoreStats.java
index 82cc15d8aaf..9549a8df8f9 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/BatchMetadataStoreStats.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/BatchMetadataStoreStats.java
@@ -18,13 +18,23 @@
  */
 package org.apache.pulsar.metadata.impl.stats;
 
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.ObservableLongUpDownCounter;
+import io.prometheus.client.Gauge;
 import io.prometheus.client.Histogram;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 public final class BatchMetadataStoreStats implements AutoCloseable {
     private static final double[] BUCKETS = new double[]{1, 5, 10, 20, 50, 
100, 200, 500, 1000};
     private static final String NAME = "name";
 
+    private static final Gauge EXECUTOR_QUEUE_SIZE = Gauge
+            .build("pulsar_batch_metadata_store_executor_queue_size", "-")
+            .labelNames(NAME)
+            .register();
     private static final Histogram OPS_WAITING = Histogram
             .build("pulsar_batch_metadata_store_queue_wait_time", "-")
             .unit("ms")
@@ -44,17 +54,46 @@ public final class BatchMetadataStoreStats implements 
AutoCloseable {
             .register();
 
     private final AtomicBoolean closed = new AtomicBoolean(false);
+    private final ThreadPoolExecutor executor;
     private final String metadataStoreName;
 
     private final Histogram.Child batchOpsWaitingChild;
     private final Histogram.Child batchExecuteTimeChild;
     private final Histogram.Child opsPerBatchChild;
 
-    public BatchMetadataStoreStats(String metadataStoreName) {
+    public static final String EXECUTOR_QUEUE_SIZE_METRIC_NAME = 
"pulsar.broker.metadata.store.executor.queue.size";
+    private final ObservableLongUpDownCounter batchMetadataStoreSizeCounter;
+
+    public BatchMetadataStoreStats(String metadataStoreName, ExecutorService 
executor, OpenTelemetry openTelemetry) {
+        if (executor instanceof ThreadPoolExecutor tx) {
+            this.executor = tx;
+        } else {
+            this.executor = null;
+        }
         this.metadataStoreName = metadataStoreName;
+
+        EXECUTOR_QUEUE_SIZE.setChild(new Gauge.Child() {
+            @Override
+            public double get() {
+                return getQueueSize();
+            }
+        }, metadataStoreName);
+
         this.batchOpsWaitingChild = OPS_WAITING.labels(metadataStoreName);
         this.batchExecuteTimeChild = 
BATCH_EXECUTE_TIME.labels(metadataStoreName);
         this.opsPerBatchChild = OPS_PER_BATCH.labels(metadataStoreName);
+
+        var meter = openTelemetry.getMeter("org.apache.pulsar");
+        var attributes = Attributes.of(MetadataStoreStats.METADATA_STORE_NAME, 
metadataStoreName);
+        this.batchMetadataStoreSizeCounter = meter
+                .upDownCounterBuilder(EXECUTOR_QUEUE_SIZE_METRIC_NAME)
+                .setDescription("The number of batch operations in the 
metadata store executor queue")
+                .setUnit("{operation}")
+                .buildWithCallback(measurement -> 
measurement.record(getQueueSize(), attributes));
+    }
+
+    private int getQueueSize() {
+        return executor == null ? 0 : executor.getQueue().size();
     }
 
     public void recordOpWaiting(long millis) {
@@ -72,9 +111,11 @@ public final class BatchMetadataStoreStats implements 
AutoCloseable {
     @Override
     public void close() throws Exception {
         if (closed.compareAndSet(false, true)) {
+            EXECUTOR_QUEUE_SIZE.remove(this.metadataStoreName);
             OPS_WAITING.remove(this.metadataStoreName);
             BATCH_EXECUTE_TIME.remove(this.metadataStoreName);
             OPS_PER_BATCH.remove(metadataStoreName);
+            batchMetadataStoreSizeCounter.close();
         }
     }
 }
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
index 0ae0b022a35..d42b2228346 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java
@@ -96,7 +96,7 @@ public class MetadataStoreFactoryImplTest {
 
     public static class MyMetadataStore extends AbstractMetadataStore {
         protected MyMetadataStore() {
-            super("custom", OpenTelemetry.noop(), null, 1);
+            super("custom", OpenTelemetry.noop(), null);
         }
 
         @Override

Reply via email to