This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b3432f4ed9c [fix][test] Replace PulsarService Mockito spy solution for 
overriding getters in tests (#19323)
b3432f4ed9c is described below

commit b3432f4ed9c9c19eef4ed696253eb2c18ebbf59d
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Jan 25 16:50:50 2023 +0200

    [fix][test] Replace PulsarService Mockito spy solution for overriding 
getters in tests (#19323)
---
 .../org/apache/pulsar/broker/PulsarService.java    |  33 +-
 .../pulsar/broker/service/BrokerService.java       |  14 +-
 .../apache/pulsar/broker/service/ServerCnx.java    |   4 +-
 .../pulsar/broker/PulsarServiceMockSupport.java    |  75 ---
 .../apache/pulsar/broker/TestPulsarService.java    | 420 +++++++++++++++
 .../broker/service/MessageCumulativeAckTest.java   |  73 +--
 .../PersistentDispatcherFailoverConsumerTest.java  | 187 +++----
 .../pulsar/broker/service/PersistentTopicTest.java | 563 ++++++++-------------
 .../broker/service/ServerCnxAuthorizationTest.java | 108 ++--
 ...herFailoverConsumerStreamingDispatcherTest.java |   2 +-
 .../PersistentTopicStreamingDispatcherTest.java    |   8 +-
 .../prometheus/NamespaceStatsAggregatorTest.java   |   7 +-
 12 files changed, 771 insertions(+), 723 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index c33b7a0c2d1..1532b28343c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -627,7 +627,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                 Duration.ofMillis(Math.max(1L, 
getConfiguration().getBrokerShutdownTimeoutMs())),
                 shutdownExecutor, () -> 
FutureUtil.createTimeoutException("Timeout in close", getClass(), "close"));
         future.handle((v, t) -> {
-            if (t != null) {
+            if (t != null && getConfiguration().getBrokerShutdownTimeoutMs() > 
0) {
                 LOG.info("Shutdown timed out after {} ms", 
getConfiguration().getBrokerShutdownTimeoutMs());
                 LOG.info(ThreadDumpUtil.buildThreadDiagnosticString());
             }
@@ -740,10 +740,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                 configurationMetadataStore = localMetadataStore;
                 shouldShutdownConfigurationMetadataStore = false;
             }
-            pulsarResources = new PulsarResources(localMetadataStore, 
configurationMetadataStore,
-                    config.getMetadataStoreOperationTimeoutSeconds());
-
-            
pulsarResources.getClusterResources().getStore().registerListener(this::handleDeleteCluster);
+            pulsarResources = newPulsarResources();
 
             orderedExecutor = OrderedExecutor.newBuilder()
                     .numThreads(config.getNumOrderedExecutorThreads())
@@ -757,10 +754,7 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
             // Now we are ready to start services
             this.bkClientFactory = newBookKeeperClientFactory();
 
-            managedLedgerClientFactory = ManagedLedgerStorage.create(
-                config, localMetadataStore,
-                    bkClientFactory, ioEventLoopGroup
-            );
+            managedLedgerClientFactory = newManagedLedgerClientFactory();
 
             this.brokerService = newBrokerService(this);
 
@@ -925,6 +919,23 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
         }
     }
 
+    @VisibleForTesting
+    protected ManagedLedgerStorage newManagedLedgerClientFactory() throws 
Exception {
+        return ManagedLedgerStorage.create(
+                config, localMetadataStore,
+                bkClientFactory, ioEventLoopGroup
+        );
+    }
+
+    @VisibleForTesting
+    protected PulsarResources newPulsarResources() {
+        PulsarResources pulsarResources = new 
PulsarResources(localMetadataStore, configurationMetadataStore,
+                config.getMetadataStoreOperationTimeoutSeconds());
+
+        
pulsarResources.getClusterResources().getStore().registerListener(this::handleDeleteCluster);
+        return pulsarResources;
+    }
+
     private synchronized void createMetricsServlet() {
         this.metricsServlet = new PulsarPrometheusMetricsServlet(
                 this, config.isExposeTopicLevelMetricsInPrometheus(),
@@ -1316,11 +1327,11 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
     }
 
     public BookKeeper getBookKeeperClient() {
-        return managedLedgerClientFactory.getBookKeeperClient();
+        return getManagedLedgerClientFactory().getBookKeeperClient();
     }
 
     public ManagedLedgerFactory getManagedLedgerFactory() {
-        return managedLedgerClientFactory.getManagedLedgerFactory();
+        return getManagedLedgerClientFactory().getManagedLedgerFactory();
     }
 
     public ManagedLedgerStorage getManagedLedgerClientFactory() {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 5c272a97f2e..618cc089560 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -934,7 +934,8 @@ public class BrokerService implements Closeable {
 
             // unload all namespace-bundles gracefully
             long closeTopicsStartTime = System.nanoTime();
-            Set<NamespaceBundle> serviceUnits = 
pulsar.getNamespaceService().getOwnedServiceUnits();
+            Set<NamespaceBundle> serviceUnits =
+                    pulsar.getNamespaceService() != null ? 
pulsar.getNamespaceService().getOwnedServiceUnits() : null;
             if (serviceUnits != null) {
                 try (RateLimiter rateLimiter = maxConcurrentUnload > 0 ? 
RateLimiter.builder()
                         .scheduledExecutorService(pulsar.getExecutor())
@@ -955,12 +956,13 @@ public class BrokerService implements Closeable {
                         }
                     });
                 }
-            }
 
-            double closeTopicsTimeSeconds = 
TimeUnit.NANOSECONDS.toMillis((System.nanoTime() - closeTopicsStartTime))
-                    / 1000.0;
-            log.info("Unloading {} namespace-bundles completed in {} seconds", 
serviceUnits.size(),
-                    closeTopicsTimeSeconds);
+                double closeTopicsTimeSeconds =
+                        TimeUnit.NANOSECONDS.toMillis((System.nanoTime() - 
closeTopicsStartTime))
+                                / 1000.0;
+                log.info("Unloading {} namespace-bundles completed in {} 
seconds", serviceUnits.size(),
+                        closeTopicsTimeSeconds);
+            }
         } catch (Exception e) {
             log.error("Failed to disable broker from loadbalancer list {}", 
e.getMessage(), e);
         }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 2a83252c309..71db6e63883 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2777,7 +2777,9 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
      * consumers from connection-map.
      */
     protected void close() {
-        ctx.close();
+        if (ctx != null) {
+            ctx.close();
+        }
     }
 
     @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceMockSupport.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceMockSupport.java
deleted file mode 100644
index 9aa53164ee7..00000000000
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceMockSupport.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker;
-
-import static org.mockito.Mockito.mock;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
-import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
-import org.apache.pulsar.metadata.impl.ZKMetadataStore;
-import org.mockito.Mockito;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-@Slf4j
-public class PulsarServiceMockSupport {
-
-    /**
-     * see: https://github.com/apache/pulsar/pull/16821.
-     * While executing 
"doReturn(pulsarResources).when(pulsar).getPulsarResources()", Meta Store 
Thread also accesses
-     * variable PulsarService.getPulsarResources() asynchronously in logic: 
"notification by zk-watcher".
-     * So execute mock-cmd in meta-thread (The executor of MetaStore is a 
single thread pool executor, so all things
-     * will be thread safety).
-     * Note: If the MetaStore's executor is no longer single-threaded, should 
mock as single-threaded if you need to
-     * execute this method.
-     */
-    public static void mockPulsarServiceProps(final PulsarService 
pulsarService, Runnable mockTask)
-            throws ExecutionException, InterruptedException, TimeoutException {
-        final CompletableFuture<Void> mockGetPulsarResourceFuture = new 
CompletableFuture<>();
-        MetadataStoreExtended metadataStoreExtended = 
pulsarService.getLocalMetadataStore();
-        if (metadataStoreExtended instanceof AbstractMetadataStore){
-            AbstractMetadataStore abstractMetadataStore = 
(AbstractMetadataStore) metadataStoreExtended;
-            abstractMetadataStore.execute(() -> {
-                mockTask.run();
-                mockGetPulsarResourceFuture.complete(null);
-            }, mock(CompletableFuture.class));
-            try {
-                mockGetPulsarResourceFuture.get(1, TimeUnit.SECONDS);
-            } catch (TimeoutException timeoutException){
-                mockTask.run();
-            }
-        } else {
-            mockTask.run();
-        }
-    }
-
-    @Test
-    public void testMockMetaStore() throws Exception{
-        AtomicInteger integer = new AtomicInteger();
-        PulsarService pulsarService = Mockito.mock(PulsarService.class);
-        
Mockito.when(pulsarService.getLocalMetadataStore()).thenReturn(Mockito.mock(ZKMetadataStore.class));
-        mockPulsarServiceProps(pulsarService, () -> integer.incrementAndGet());
-        Assert.assertEquals(integer.get(), 1);
-    }
-}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/TestPulsarService.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/TestPulsarService.java
new file mode 100644
index 00000000000..873be15b8ff
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/TestPulsarService.java
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker;
+
+import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgsRecordingInvocations;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Optional;
+import java.util.function.Function;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.ToString;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.stats.NullStatsProvider;
+import org.apache.bookkeeper.stats.StatsProvider;
+import org.apache.pulsar.broker.resources.NamespaceResources;
+import org.apache.pulsar.broker.resources.PulsarResources;
+import org.apache.pulsar.broker.resources.TopicResources;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.PulsarMetadataEventSynchronizer;
+import org.apache.pulsar.broker.service.ServerCnx;
+import org.apache.pulsar.broker.service.schema.DefaultSchemaRegistryService;
+import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
+import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
+import org.apache.pulsar.broker.transaction.TransactionTestBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.util.GracefulExecutorServicesShutdown;
+import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.metadata.impl.MetadataStoreFactoryImpl;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Subclass of PulsarService that is used for some tests.
+ * This was written as a replacement for the previous Mockito Spy over 
PulsarService solution which caused
+ * a flaky test issue https://github.com/apache/pulsar/issues/13620.
+ */
+public class TestPulsarService extends PulsarService {
+
+
+    @ToString
+    @Getter
+    @Builder
+    public static class Factory implements AutoCloseable {
+        private final ServiceConfiguration config;
+        private final MetadataStoreExtended localMetadataStore;
+        private final MetadataStoreExtended configurationMetadataStore;
+        private final PulsarResources pulsarResources;
+
+        private final OrderedExecutor executor;
+
+        private final EventLoopGroup eventLoopGroup;
+
+        private final ManagedLedgerStorage managedLedgerClientFactory;
+
+        private final Function<PulsarService, BrokerService> 
brokerServiceFunction;
+
+        private final boolean useSpies;
+
+        private final PulsarService pulsarService;
+
+        private final Compactor compactor;
+
+        private final BrokerService brokerService;
+
+        public ManagedLedgerFactory getManagedLedgerFactory() {
+            return managedLedgerClientFactory.getManagedLedgerFactory();
+        }
+
+        public static FactoryBuilder builder() {
+            return new CustomFactoryBuilder();
+        }
+
+        public void close() throws Exception {
+            pulsarService.getBrokerService().close();
+            pulsarService.close();
+            GracefulExecutorServicesShutdown.initiate()
+                    .timeout(Duration.ZERO)
+                    .shutdown(executor)
+                    .handle().get();
+            eventLoopGroup.shutdownGracefully().get();
+            if (localMetadataStore != configurationMetadataStore) {
+                localMetadataStore.close();
+                configurationMetadataStore.close();
+            } else {
+                localMetadataStore.close();
+            }
+        }
+
+        public ServerCnx createServerCnxSpy() {
+            return 
spyWithClassAndConstructorArgsRecordingInvocations(ServerCnx.class,
+                    getPulsarService());
+        }
+
+        public static class FactoryBuilder {
+            protected boolean useTestPulsarResources = false;
+            protected MetadataStore pulsarResourcesMetadataStore;
+
+            public FactoryBuilder useTestPulsarResources() {
+                useTestPulsarResources = true;
+                return this;
+            }
+
+            public FactoryBuilder useTestPulsarResources(MetadataStore 
metadataStore) {
+                useTestPulsarResources = true;
+                pulsarResourcesMetadataStore = metadataStore;
+                return this;
+            }
+
+            public FactoryBuilder managedLedgerClients(BookKeeper 
bookKeeperClient,
+                                                       ManagedLedgerFactory 
managedLedgerFactory) {
+                return managedLedgerClientFactory(
+                        
Factory.createManagedLedgerClientFactory(bookKeeperClient, 
managedLedgerFactory));
+            }
+        }
+
+        private static class CustomFactoryBuilder extends 
Factory.FactoryBuilder {
+
+            @Override
+            public Factory build() {
+                initializeDefaults();
+                return super.build();
+            }
+
+            private void initializeDefaults() {
+                try {
+                    if (super.managedLedgerClientFactory == null) {
+                        if (super.executor == null) {
+                            super.executor = 
OrderedExecutor.newBuilder().numThreads(1)
+                                    
.name(TestPulsarService.class.getSimpleName() + "-executor").build();
+                        }
+                        TransactionTestBase.NonClosableMockBookKeeper 
mockBookKeeper =
+                                
TransactionTestBase.createMockBookKeeper(super.executor);
+                        ManagedLedgerFactory mlFactoryMock = 
mock(ManagedLedgerFactory.class);
+
+                        managedLedgerClientFactory(
+                                
Factory.createManagedLedgerClientFactory(mockBookKeeper, mlFactoryMock));
+                    }
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+                if (super.config == null) {
+                    ServiceConfiguration svcConfig = 
spy(ServiceConfiguration.class);
+                    svcConfig.setBrokerShutdownTimeoutMs(0L);
+                    
svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
+                    svcConfig.setClusterName("pulsar-cluster");
+                    config(svcConfig);
+                }
+                if (super.localMetadataStore == null || 
super.configurationMetadataStore == null) {
+                    try {
+                        MetadataStoreExtended store = 
MetadataStoreFactoryImpl.createExtended("memory:local",
+                                MetadataStoreConfig.builder().build());
+                        if (super.localMetadataStore == null) {
+                            localMetadataStore(store);
+                        }
+                        if (super.configurationMetadataStore == null) {
+                            configurationMetadataStore(store);
+                        }
+                    } catch (MetadataStoreException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+                if (super.pulsarResources == null) {
+                    if (useTestPulsarResources) {
+                        MetadataStore metadataStore = 
pulsarResourcesMetadataStore;
+                        if (metadataStore == null) {
+                            metadataStore = super.configurationMetadataStore;
+                        }
+                        NamespaceResources nsr =
+                                
spyWithClassAndConstructorArgs(NamespaceResources.class, metadataStore, 30);
+                        TopicResources tsr = 
spyWithClassAndConstructorArgs(TopicResources.class, metadataStore);
+                        if (!super.useSpies) {
+                            pulsarResources(
+                                    new 
TestPulsarResources(super.localMetadataStore, super.configurationMetadataStore,
+                                            tsr, nsr));
+                        } else {
+                            pulsarResources(
+                                    
spyWithClassAndConstructorArgs(TestPulsarResources.class, 
super.localMetadataStore,
+                                            super.configurationMetadataStore, 
tsr, nsr));
+                        }
+                    } else {
+                        if (!super.useSpies) {
+                            pulsarResources(
+                                    new 
PulsarResources(super.localMetadataStore, super.configurationMetadataStore));
+                        } else {
+                            pulsarResources(
+                                    
spyWithClassAndConstructorArgs(PulsarResources.class, super.localMetadataStore,
+                                            super.configurationMetadataStore));
+                        }
+                    }
+                }
+                if (super.brokerServiceFunction == null) {
+                    if (super.brokerService == null) {
+                        if (super.eventLoopGroup == null) {
+                            super.eventLoopGroup = new NioEventLoopGroup();
+                        }
+                        brokerServiceFunction(pulsarService -> {
+                            try {
+                                if (!super.useSpies) {
+                                    return new BrokerService(pulsarService, 
super.eventLoopGroup);
+                                } else {
+                                    return 
spyWithClassAndConstructorArgs(BrokerService.class, pulsarService,
+                                            super.eventLoopGroup);
+                                }
+                            } catch (Exception e) {
+                                throw new RuntimeException(e);
+                            }
+                        });
+                    } else {
+                        brokerServiceFunction(pulsarService -> 
super.brokerService);
+                    }
+                }
+                if (!super.useSpies) {
+                    pulsarService(new TestPulsarService(super.config, 
super.localMetadataStore,
+                            super.configurationMetadataStore, 
super.pulsarResources, super.managedLedgerClientFactory,
+                            super.brokerServiceFunction, super.executor, 
super.compactor));
+                } else {
+                    
pulsarService(spyWithClassAndConstructorArgs(TestPulsarService.class, 
super.config,
+                            super.localMetadataStore, 
super.configurationMetadataStore, super.pulsarResources,
+                            super.managedLedgerClientFactory, 
super.brokerServiceFunction, super.executor,
+                            super.compactor));
+                }
+                if (super.brokerService == null) {
+                    brokerService(super.pulsarService.getBrokerService());
+                }
+            }
+        }
+
+        @NotNull
+        private static ManagedLedgerStorage 
createManagedLedgerClientFactory(BookKeeper bookKeeperClient,
+                                                                             
ManagedLedgerFactory managedLedgerFactory) {
+            return new ManagedLedgerStorage() {
+
+                @Override
+                public void initialize(ServiceConfiguration conf, 
MetadataStoreExtended metadataStore,
+                                       BookKeeperClientFactory 
bookkeeperProvider, EventLoopGroup eventLoopGroup)
+                        throws Exception {
+
+                }
+
+                @Override
+                public ManagedLedgerFactory getManagedLedgerFactory() {
+                    return managedLedgerFactory;
+                }
+
+                @Override
+                public StatsProvider getStatsProvider() {
+                    return new NullStatsProvider();
+                }
+
+                @Override
+                public BookKeeper getBookKeeperClient() {
+                    return bookKeeperClient;
+                }
+
+                @Override
+                public void close() throws IOException {
+
+                }
+            };
+        }
+    }
+
+    private static class TestPulsarResources extends PulsarResources {
+
+        private final TopicResources topicResources;
+        private final NamespaceResources namespaceResources;
+
+        public TestPulsarResources(MetadataStore localMetadataStore, 
MetadataStore configurationMetadataStore,
+                                   TopicResources topicResources, 
NamespaceResources namespaceResources) {
+            super(localMetadataStore, configurationMetadataStore);
+            this.topicResources = topicResources;
+            this.namespaceResources = namespaceResources;
+        }
+
+        @Override
+        public TopicResources getTopicResources() {
+            return topicResources;
+        }
+
+        @Override
+        public NamespaceResources getNamespaceResources() {
+            return namespaceResources;
+        }
+    }
+
+    private final MetadataStoreExtended localMetadataStore;
+    private final MetadataStoreExtended configurationMetadataStore;
+    private final PulsarResources pulsarResources;
+    private final ManagedLedgerStorage managedLedgerClientFactory;
+    private final BrokerService brokerService;
+
+    private final OrderedExecutor executor;
+
+    private final Compactor compactor;
+
+    private final SchemaRegistryService schemaRegistryService;
+
+    private final PulsarClient pulsarClient;
+
+    protected TestPulsarService(ServiceConfiguration config, 
MetadataStoreExtended localMetadataStore,
+                                MetadataStoreExtended 
configurationMetadataStore, PulsarResources pulsarResources,
+                                ManagedLedgerStorage 
managedLedgerClientFactory,
+                                Function<PulsarService, BrokerService> 
brokerServiceFunction, OrderedExecutor executor,
+                                Compactor compactor) {
+        super(config);
+        this.localMetadataStore = localMetadataStore;
+        this.configurationMetadataStore = configurationMetadataStore;
+        this.pulsarResources = pulsarResources;
+        this.managedLedgerClientFactory = managedLedgerClientFactory;
+        this.brokerService = brokerServiceFunction.apply(this);
+        this.executor = executor;
+        this.compactor = compactor;
+        this.schemaRegistryService = 
spyWithClassAndConstructorArgs(DefaultSchemaRegistryService.class);
+        this.pulsarClient = mock(PulsarClientImpl.class);
+    }
+
+    @Override
+    public synchronized PulsarClient getClient() throws PulsarServerException {
+        return pulsarClient;
+    }
+
+    @Override
+    public SchemaRegistryService getSchemaRegistryService() {
+        return schemaRegistryService;
+    }
+
+    @Override
+    public MetadataStore 
createConfigurationMetadataStore(PulsarMetadataEventSynchronizer synchronizer)
+            throws MetadataStoreException {
+        return configurationMetadataStore;
+    }
+
+    @Override
+    public MetadataStoreExtended 
createLocalMetadataStore(PulsarMetadataEventSynchronizer synchronizer)
+            throws MetadataStoreException, PulsarServerException {
+        return localMetadataStore;
+    }
+
+    @Override
+    public PulsarResources getPulsarResources() {
+        return pulsarResources;
+    }
+
+    public BrokerService getBrokerService() {
+        return brokerService;
+    }
+
+    @Override
+    public Compactor getCompactor() throws PulsarServerException {
+        if (compactor != null) {
+            return compactor;
+        } else {
+            return super.getCompactor();
+        }
+    }
+
+    @Override
+    public MetadataStore getConfigurationMetadataStore() {
+        return configurationMetadataStore;
+    }
+
+    @Override
+    public MetadataStoreExtended getLocalMetadataStore() {
+        return localMetadataStore;
+    }
+
+    @Override
+    public ManagedLedgerStorage getManagedLedgerClientFactory() {
+        return managedLedgerClientFactory;
+    }
+
+    @Override
+    public OrderedExecutor getOrderedExecutor() {
+        return executor;
+    }
+
+    @Override
+    protected PulsarResources newPulsarResources() {
+        return pulsarResources;
+    }
+
+    @Override
+    protected ManagedLedgerStorage newManagedLedgerClientFactory() throws 
Exception {
+        return managedLedgerClientFactory;
+    }
+
+    @Override
+    protected BrokerService newBrokerService(PulsarService pulsar) throws 
Exception {
+        return brokerService;
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
index 0a227f6812c..703376b2546 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
@@ -19,7 +19,6 @@
 package org.apache.pulsar.broker.service;
 
 import static java.util.Collections.emptyMap;
-import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
 import static org.apache.pulsar.common.api.proto.CommandAck.AckType.Cumulative;
 import static 
org.apache.pulsar.common.api.proto.CommandSubscribe.SubType.Exclusive;
 import static 
org.apache.pulsar.common.api.proto.CommandSubscribe.SubType.Failover;
@@ -36,29 +35,17 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
 import java.net.InetSocketAddress;
-import java.util.Optional;
-import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.mledger.ManagedLedger;
-import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
-import org.apache.pulsar.broker.PulsarService;
-import org.apache.pulsar.broker.PulsarServiceMockSupport;
-import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.resources.PulsarResources;
+import org.apache.pulsar.broker.TestPulsarService;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
-import org.apache.pulsar.broker.transaction.TransactionTestBase;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.common.api.proto.CommandAck;
 import org.apache.pulsar.common.api.proto.CommandSubscribe;
 import org.apache.pulsar.common.api.proto.ProtocolVersion;
 import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.metadata.api.MetadataStore;
-import org.apache.pulsar.metadata.api.MetadataStoreConfig;
-import org.apache.pulsar.metadata.api.MetadataStoreFactory;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
@@ -67,45 +54,17 @@ import org.testng.annotations.Test;
 @Test(groups = "broker")
 public class MessageCumulativeAckTest {
     private final int consumerId = 1;
-    private BrokerService brokerService;
+
     private ServerCnx serverCnx;
-    private MetadataStore store;
-    protected PulsarService pulsar;
-    private OrderedExecutor executor;
-    private EventLoopGroup eventLoopGroup;
     private PersistentSubscription sub;
+    private TestPulsarService.Factory testPulsarServiceFactory;
 
     @BeforeMethod
     public void setup() throws Exception {
-        executor = 
OrderedExecutor.newBuilder().numThreads(1).name("persistent-dispatcher-cumulative-ack-test").build();
-        ServiceConfiguration svcConfig = spy(ServiceConfiguration.class);
-        svcConfig.setBrokerShutdownTimeoutMs(0L);
-        svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
-        svcConfig.setClusterName("pulsar-cluster");
-        pulsar = spyWithClassAndConstructorArgs(PulsarService.class, 
svcConfig);
-        doReturn(svcConfig).when(pulsar).getConfiguration();
-
-        ManagedLedgerFactory mlFactoryMock = mock(ManagedLedgerFactory.class);
-        doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory();
-        doReturn(TransactionTestBase.createMockBookKeeper(executor))
-            .when(pulsar).getBookKeeperClient();
-
-        store = MetadataStoreFactory.create("memory:local", 
MetadataStoreConfig.builder().build());
-        doReturn(store).when(pulsar).getLocalMetadataStore();
-        doReturn(store).when(pulsar).getConfigurationMetadataStore();
-
-        PulsarResources pulsarResources = new PulsarResources(store, store);
-        PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
-            doReturn(pulsarResources).when(pulsar).getPulsarResources();
-        });
-
-        eventLoopGroup = new NioEventLoopGroup();
-        brokerService = spyWithClassAndConstructorArgs(BrokerService.class, 
pulsar, eventLoopGroup);
-        PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
-            doReturn(brokerService).when(pulsar).getBrokerService();
-        });
-
-        serverCnx = spyWithClassAndConstructorArgs(ServerCnx.class, pulsar);
+        testPulsarServiceFactory = TestPulsarService.Factory.builder()
+                .build();
+
+        serverCnx = testPulsarServiceFactory.createServerCnxSpy();
         doReturn(true).when(serverCnx).isActive();
         doReturn(true).when(serverCnx).isWritable();
         doReturn(new InetSocketAddress("localhost", 
1234)).when(serverCnx).clientAddress();
@@ -115,7 +74,7 @@ public class MessageCumulativeAckTest {
                 .when(serverCnx).getCommandSender();
 
         String topicName = 
TopicName.get("MessageCumulativeAckTest").toString();
-        PersistentTopic persistentTopic = new PersistentTopic(topicName, 
mock(ManagedLedger.class), brokerService);
+        PersistentTopic persistentTopic = new PersistentTopic(topicName, 
mock(ManagedLedger.class), testPulsarServiceFactory.getBrokerService());
         sub = spy(new PersistentSubscription(persistentTopic, "sub-1",
             mock(ManagedCursorImpl.class), false));
         doNothing().when(sub).acknowledgeMessage(any(), any(), any());
@@ -123,20 +82,10 @@ public class MessageCumulativeAckTest {
 
     @AfterMethod(alwaysRun = true)
     public void shutdown() throws Exception {
-        if (brokerService != null) {
-            brokerService.close();
-            brokerService = null;
-        }
-        if (pulsar != null) {
-            pulsar.close();
-            pulsar = null;
-        }
-
-        executor.shutdown();
-        if (eventLoopGroup != null) {
-            eventLoopGroup.shutdownGracefully().get();
+        if (testPulsarServiceFactory != null) {
+            testPulsarServiceFactory.close();
+            testPulsarServiceFactory = null;
         }
-        store.close();
         sub = null;
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index 5381a367afd..2bebac14023 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.broker.service;
 
-import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
 import static 
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.matches;
@@ -38,8 +37,6 @@ import static org.testng.AssertJUnit.assertSame;
 import static org.testng.AssertJUnit.assertTrue;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
 import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
@@ -51,7 +48,6 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.function.Supplier;
-import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback;
@@ -61,19 +57,15 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
-import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.pulsar.broker.PulsarService;
-import org.apache.pulsar.broker.PulsarServiceMockSupport;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.TestPulsarService;
 import org.apache.pulsar.broker.namespace.NamespaceService;
-import org.apache.pulsar.broker.resources.PulsarResources;
 import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
 import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
-import org.apache.pulsar.broker.transaction.TransactionTestBase;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.common.api.proto.BaseCommand;
 import org.apache.pulsar.common.api.proto.CommandActiveConsumerChange;
@@ -82,12 +74,6 @@ import 
org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.ProtocolVersion;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.metadata.api.MetadataStore;
-import org.apache.pulsar.metadata.api.MetadataStoreConfig;
-import org.apache.pulsar.metadata.api.MetadataStoreFactory;
-import org.apache.zookeeper.ZooKeeper;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -98,59 +84,30 @@ import org.testng.annotations.Test;
 @Test(groups = "quarantine")
 public class PersistentDispatcherFailoverConsumerTest {
 
-    private BrokerService brokerService;
-    private ManagedLedgerFactory mlFactoryMock;
     private ServerCnx serverCnx;
     private ServerCnx serverCnxWithOldVersion;
     private ManagedLedger ledgerMock;
     private ManagedCursor cursorMock;
-    private MetadataStore store;
     private ChannelHandlerContext channelCtx;
     private LinkedBlockingQueue<CommandActiveConsumerChange> consumerChanges;
-    private ZooKeeper mockZk;
-    protected PulsarService pulsar;
+
+    protected TestPulsarService.Factory testPulsarServiceFactory;
+
     final String successTopicName = 
"persistent://part-perf/global/perf.t1/ptopic";
     final String failTopicName = 
"persistent://part-perf/global/perf.t1/pfailTopic";
 
-    private OrderedExecutor executor;
-    private EventLoopGroup eventLoopGroup;
-
     @BeforeMethod
     public void setup() throws Exception {
-        executor = 
OrderedExecutor.newBuilder().numThreads(1).name("persistent-dispatcher-failover-test").build();
         ServiceConfiguration svcConfig = spy(ServiceConfiguration.class);
         svcConfig.setBrokerShutdownTimeoutMs(0L);
         svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
         svcConfig.setClusterName("pulsar-cluster");
         svcConfig.setSystemTopicEnabled(false);
         svcConfig.setTopicLevelPoliciesEnabled(false);
-        pulsar = spyWithClassAndConstructorArgs(PulsarService.class, 
svcConfig);
-        store = MetadataStoreFactory.create("memory:local", 
MetadataStoreConfig.builder().build());
-        doReturn(store).when(pulsar).getLocalMetadataStore();
-        doReturn(store).when(pulsar).getConfigurationMetadataStore();
-
-        PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
-            doReturn(svcConfig).when(pulsar).getConfiguration();
-        });
-
-        mlFactoryMock = mock(ManagedLedgerFactory.class);
-        PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
-            doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory();
-        });
-
-        doReturn(TransactionTestBase.createMockBookKeeper(executor))
-                .when(pulsar).getBookKeeperClient();
-        eventLoopGroup = new NioEventLoopGroup();
-
-        PulsarResources pulsarResources = new PulsarResources(store, store);
-        PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
-            doReturn(pulsarResources).when(pulsar).getPulsarResources();
-        });
-
-        brokerService = spyWithClassAndConstructorArgs(BrokerService.class, 
pulsar, eventLoopGroup);
-        PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
-            doReturn(brokerService).when(pulsar).getBrokerService();
-        });
+        testPulsarServiceFactory = TestPulsarService.Factory.builder()
+                .config(svcConfig)
+                .useSpies(true)
+                .build();
 
         consumerChanges = new LinkedBlockingQueue<>();
         this.channelCtx = mock(ChannelHandlerContext.class);
@@ -175,7 +132,7 @@ public class PersistentDispatcherFailoverConsumerTest {
             return null;
         }).when(channelCtx).writeAndFlush(any(), any());
 
-        serverCnx = spyWithClassAndConstructorArgs(ServerCnx.class, pulsar);
+        serverCnx = testPulsarServiceFactory.createServerCnxSpy();
         doReturn(true).when(serverCnx).isActive();
         doReturn(true).when(serverCnx).isWritable();
         doReturn(new InetSocketAddress("localhost", 
1234)).when(serverCnx).clientAddress();
@@ -184,7 +141,7 @@ public class PersistentDispatcherFailoverConsumerTest {
         doReturn(new PulsarCommandSenderImpl(null, serverCnx))
                 .when(serverCnx).getCommandSender();
 
-        serverCnxWithOldVersion = 
spyWithClassAndConstructorArgs(ServerCnx.class, pulsar);
+        serverCnxWithOldVersion = 
testPulsarServiceFactory.createServerCnxSpy();
         doReturn(true).when(serverCnxWithOldVersion).isActive();
         doReturn(true).when(serverCnxWithOldVersion).isWritable();
         doReturn(new InetSocketAddress("localhost", 1234))
@@ -196,9 +153,7 @@ public class PersistentDispatcherFailoverConsumerTest {
                 .when(serverCnxWithOldVersion).getCommandSender();
 
         NamespaceService nsSvc = mock(NamespaceService.class);
-        PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
-            doReturn(nsSvc).when(pulsar).getNamespaceService();
-        });
+        
doReturn(nsSvc).when(testPulsarServiceFactory.getPulsarService()).getNamespaceService();
         
doReturn(true).when(nsSvc).isServiceUnitOwned(any(NamespaceBundle.class));
         doReturn(true).when(nsSvc).isServiceUnitActive(any(TopicName.class));
         
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkTopicOwnership(any(TopicName.class));
@@ -209,23 +164,9 @@ public class PersistentDispatcherFailoverConsumerTest {
 
     @AfterMethod(alwaysRun = true)
     public void shutdown() throws Exception {
-        if (brokerService != null) {
-            brokerService.close();
-            brokerService = null;
-        }
-        if (pulsar != null) {
-            pulsar.close();
-            pulsar = null;
-        }
-
-        if (executor != null) {
-            executor.shutdown();
-        }
-        if (eventLoopGroup != null) {
-            eventLoopGroup.shutdownGracefully().get();
-        }
-        if (store != null) {
-            store.close();
+        if (testPulsarServiceFactory != null) {
+            testPulsarServiceFactory.close();
+            testPulsarServiceFactory = null;
         }
     }
 
@@ -233,64 +174,50 @@ public class PersistentDispatcherFailoverConsumerTest {
         ledgerMock = mock(ManagedLedger.class);
         cursorMock = mock(ManagedCursorImpl.class);
 
-        doReturn(new ArrayList<Object>()).when(ledgerMock).getCursors();
+        doReturn(new ArrayList<>()).when(ledgerMock).getCursors();
         doReturn("mockCursor").when(cursorMock).getName();
 
         // call openLedgerComplete with ledgerMock on ML factory asyncOpen
-        doAnswer(new Answer<Object>() {
-            @Override
-            public Object answer(InvocationOnMock invocationOnMock) throws 
Throwable {
-                ((OpenLedgerCallback) 
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
-                return null;
-            }
-        }).when(mlFactoryMock).asyncOpen(matches(".*success.*"), 
any(ManagedLedgerConfig.class),
-                any(OpenLedgerCallback.class), any(Supplier.class), any());
+        doAnswer(invocationOnMock -> {
+            ((OpenLedgerCallback) 
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
+            return null;
+        }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+                .asyncOpen(matches(".*success.*"), 
any(ManagedLedgerConfig.class),
+                        any(OpenLedgerCallback.class), any(Supplier.class), 
any());
 
         // call openLedgerFailed on ML factory asyncOpen
-        doAnswer(new Answer<Object>() {
-            @Override
-            public Object answer(InvocationOnMock invocationOnMock) throws 
Throwable {
-                ((OpenLedgerCallback) invocationOnMock.getArguments()[2])
-                        .openLedgerFailed(new ManagedLedgerException("Managed 
ledger failure"), null);
-                return null;
-            }
-        }).when(mlFactoryMock).asyncOpen(matches(".*fail.*"), 
any(ManagedLedgerConfig.class),
-                any(OpenLedgerCallback.class), any(Supplier.class), any());
+        doAnswer(invocationOnMock -> {
+            ((OpenLedgerCallback) invocationOnMock.getArguments()[2])
+                    .openLedgerFailed(new ManagedLedgerException("Managed 
ledger failure"), null);
+            return null;
+        }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+                .asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class),
+                        any(OpenLedgerCallback.class), any(Supplier.class), 
any());
 
         // call addComplete on ledger asyncAddEntry
-        doAnswer(new Answer<Object>() {
-            @Override
-            public Object answer(InvocationOnMock invocationOnMock) throws 
Throwable {
-                ((AddEntryCallback) 
invocationOnMock.getArguments()[1]).addComplete(
-                        new PositionImpl(1, 1), null, null);
-                return null;
-            }
+        doAnswer(invocationOnMock -> {
+            ((AddEntryCallback) 
invocationOnMock.getArguments()[1]).addComplete(
+                    new PositionImpl(1, 1), null, null);
+            return null;
         }).when(ledgerMock).asyncAddEntry(any(byte[].class), 
any(AddEntryCallback.class), any());
 
         // call openCursorComplete on cursor asyncOpen
-        doAnswer(new Answer<Object>() {
-            @Override
-            public Object answer(InvocationOnMock invocationOnMock) throws 
Throwable {
-                ((OpenCursorCallback) 
invocationOnMock.getArguments()[2]).openCursorComplete(cursorMock, null);
-                return null;
-            }
-        }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), 
any(InitialPosition.class), any(OpenCursorCallback.class), any());
+        doAnswer(invocationOnMock -> {
+            ((OpenCursorCallback) 
invocationOnMock.getArguments()[2]).openCursorComplete(cursorMock, null);
+            return null;
+        }).when(ledgerMock)
+                .asyncOpenCursor(matches(".*success.*"), 
any(InitialPosition.class), any(OpenCursorCallback.class),
+                        any());
 
         // call deleteLedgerComplete on ledger asyncDelete
-        doAnswer(new Answer<Object>() {
-            @Override
-            public Object answer(InvocationOnMock invocationOnMock) throws 
Throwable {
-                ((DeleteLedgerCallback) 
invocationOnMock.getArguments()[0]).deleteLedgerComplete(null);
-                return null;
-            }
+        doAnswer(invocationOnMock -> {
+            ((DeleteLedgerCallback) 
invocationOnMock.getArguments()[0]).deleteLedgerComplete(null);
+            return null;
         }).when(ledgerMock).asyncDelete(any(DeleteLedgerCallback.class), 
any());
 
-        doAnswer(new Answer<Object>() {
-            @Override
-            public Object answer(InvocationOnMock invocationOnMock) throws 
Throwable {
-                ((DeleteCursorCallback) 
invocationOnMock.getArguments()[1]).deleteCursorComplete(null);
-                return null;
-            }
+        doAnswer(invocationOnMock -> {
+            ((DeleteCursorCallback) 
invocationOnMock.getArguments()[1]).deleteCursorComplete(null);
+            return null;
         }).when(ledgerMock).asyncDeleteCursor(matches(".*success.*"), 
any(DeleteCursorCallback.class), any());
     }
 
@@ -303,7 +230,8 @@ public class PersistentDispatcherFailoverConsumerTest {
 
     @Test
     public void testConsumerGroupChangesWithOldNewConsumers() throws Exception 
{
-        PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
+        PersistentTopic topic =
+                new PersistentTopic(successTopicName, ledgerMock, 
testPulsarServiceFactory.getBrokerService());
         PersistentSubscription sub = new PersistentSubscription(topic, 
"sub-1", cursorMock, false);
 
         int partitionIndex = 0;
@@ -315,7 +243,8 @@ public class PersistentDispatcherFailoverConsumerTest {
 
         // 2. Add old consumer
         Consumer consumer1 = new Consumer(sub, SubType.Exclusive, 
topic.getName(), 1 /* consumer id */, 0,
-                "Cons1"/* consumer name */, true, serverCnxWithOldVersion, 
"myrole-1", Collections.emptyMap(), false, null, MessageId.latest, 
DEFAULT_CONSUMER_EPOCH);
+                "Cons1"/* consumer name */, true, serverCnxWithOldVersion, 
"myrole-1", Collections.emptyMap(), false,
+                null, MessageId.latest, DEFAULT_CONSUMER_EPOCH);
         pdfc.addConsumer(consumer1);
         List<Consumer> consumers = pdfc.getConsumers();
         assertSame(consumers.get(0).consumerName(), consumer1.consumerName());
@@ -326,7 +255,8 @@ public class PersistentDispatcherFailoverConsumerTest {
 
         // 3. Add new consumer
         Consumer consumer2 = new Consumer(sub, SubType.Exclusive, 
topic.getName(), 2 /* consumer id */, 0,
-                "Cons2"/* consumer name */, true, serverCnx, "myrole-1", 
Collections.emptyMap(), false, null, MessageId.latest, DEFAULT_CONSUMER_EPOCH);
+                "Cons2"/* consumer name */, true, serverCnx, "myrole-1", 
Collections.emptyMap(), false, null,
+                MessageId.latest, DEFAULT_CONSUMER_EPOCH);
         pdfc.addConsumer(consumer2);
         consumers = pdfc.getConsumers();
         assertSame(consumers.get(0).consumerName(), consumer1.consumerName());
@@ -342,7 +272,7 @@ public class PersistentDispatcherFailoverConsumerTest {
     public void testAddRemoveConsumer() throws Exception {
         log.info("--- Starting 
PersistentDispatcherFailoverConsumerTest::testAddConsumer ---");
 
-        PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
+        PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, testPulsarServiceFactory.getBrokerService());
         PersistentSubscription sub = new PersistentSubscription(topic, 
"sub-1", cursorMock, false);
 
         int partitionIndex = 4;
@@ -473,7 +403,8 @@ public class PersistentDispatcherFailoverConsumerTest {
     public void testAddRemoveConsumerNonPartitionedTopic() throws Exception {
         log.info("--- Starting 
PersistentDispatcherFailoverConsumerTest::testAddConsumer ---");
 
-        PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
+        PersistentTopic topic =
+                new PersistentTopic(successTopicName, ledgerMock, 
testPulsarServiceFactory.getBrokerService());
         PersistentSubscription sub = new PersistentSubscription(topic, 
"sub-1", cursorMock, false);
 
         // Non partitioned topic.
@@ -533,7 +464,8 @@ public class PersistentDispatcherFailoverConsumerTest {
     @Test
     public void 
testMultipleDispatcherGetNextConsumerWithDifferentPriorityLevel() throws 
Exception {
 
-        PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
+        PersistentTopic topic =
+                new PersistentTopic(successTopicName, ledgerMock, 
testPulsarServiceFactory.getBrokerService());
         PersistentDispatcherMultipleConsumers dispatcher = new 
PersistentDispatcherMultipleConsumers(topic, cursorMock, null);
         Consumer consumer1 = createConsumer(topic, 0, 2, false, 1);
         Consumer consumer2 = createConsumer(topic, 0, 2, false, 2);
@@ -577,7 +509,8 @@ public class PersistentDispatcherFailoverConsumerTest {
 
     @Test
     public void testFewBlockedConsumerSamePriority() throws Exception{
-        PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
+        PersistentTopic topic =
+                new PersistentTopic(successTopicName, ledgerMock, 
testPulsarServiceFactory.getBrokerService());
         PersistentDispatcherMultipleConsumers dispatcher = new 
PersistentDispatcherMultipleConsumers(topic, cursorMock, null);
         Consumer consumer1 = createConsumer(topic, 0, 2, false, 1);
         Consumer consumer2 = createConsumer(topic, 0, 2, false, 2);
@@ -604,7 +537,8 @@ public class PersistentDispatcherFailoverConsumerTest {
 
     @Test
     public void testFewBlockedConsumerDifferentPriority() throws Exception {
-        PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
+        PersistentTopic topic =
+                new PersistentTopic(successTopicName, ledgerMock, 
testPulsarServiceFactory.getBrokerService());
         PersistentDispatcherMultipleConsumers dispatcher = new 
PersistentDispatcherMultipleConsumers(topic, cursorMock, null);
         Consumer consumer1 = createConsumer(topic, 0, 2, false, 1);
         Consumer consumer2 = createConsumer(topic, 0, 2, false, 2);
@@ -658,7 +592,8 @@ public class PersistentDispatcherFailoverConsumerTest {
 
     @Test
     public void testFewBlockedConsumerDifferentPriority2() throws Exception {
-        PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
+        PersistentTopic topic =
+                new PersistentTopic(successTopicName, ledgerMock, 
testPulsarServiceFactory.getBrokerService());
         PersistentDispatcherMultipleConsumers dispatcher = new 
PersistentDispatcherMultipleConsumers(topic, cursorMock, null);
         Consumer consumer1 = createConsumer(topic, 0, 2, true, 1);
         Consumer consumer2 = createConsumer(topic, 0, 2, true, 2);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 8e3c3c73054..ae9fb9a1bf7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -18,9 +18,7 @@
  */
 package org.apache.pulsar.broker.service;
 
-import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
 import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgsRecordingInvocations;
-import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper;
 import static 
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
@@ -48,14 +46,11 @@ import io.netty.buffer.Unpooled;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.DefaultEventLoop;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.URL;
-import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -76,7 +71,6 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Supplier;
 import lombok.Cleanup;
-import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
@@ -90,19 +84,15 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
-import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 import org.apache.pulsar.broker.PulsarService;
-import org.apache.pulsar.broker.PulsarServiceMockSupport;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.TestPulsarService;
 import org.apache.pulsar.broker.namespace.NamespaceService;
-import org.apache.pulsar.broker.resources.NamespaceResources;
-import org.apache.pulsar.broker.resources.PulsarResources;
-import org.apache.pulsar.broker.resources.TopicResources;
 import org.apache.pulsar.broker.service.persistent.CompactorSubscription;
 import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator;
 import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
@@ -110,8 +100,6 @@ import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleAct
 import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
-import org.apache.pulsar.broker.service.schema.DefaultSchemaRegistryService;
-import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
@@ -128,7 +116,6 @@ import org.apache.pulsar.common.api.proto.KeySharedMode;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.api.proto.ProducerAccessMode;
 import org.apache.pulsar.common.naming.NamespaceBundle;
-import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.Policies;
@@ -138,9 +125,7 @@ import org.apache.pulsar.common.protocol.ByteBufPair;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.FutureUtil;
-import org.apache.pulsar.common.util.GracefulExecutorServicesShutdown;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
-import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.apache.pulsar.compaction.CompactedTopic;
 import org.apache.pulsar.compaction.CompactedTopicContext;
 import org.apache.pulsar.compaction.Compactor;
@@ -149,7 +134,6 @@ import 
org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
 import org.awaitility.Awaitility;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -160,10 +144,6 @@ import org.testng.annotations.Test;
 
 @Test(groups = "broker")
 public class PersistentTopicTest extends MockedBookKeeperTestCase {
-    protected PulsarService pulsar;
-    private BrokerService brokerService;
-    private SchemaRegistryService schemaRegistryService;
-    private ManagedLedgerFactory mlFactoryMock;
     private ServerCnx serverCnx;
     private ManagedLedger ledgerMock;
     private ManagedCursor cursorMock;
@@ -175,14 +155,14 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
     final String successSubName2 = "successSub2";
     private static final Logger log = 
LoggerFactory.getLogger(PersistentTopicTest.class);
 
-    private OrderedExecutor executor;
-    private EventLoopGroup eventLoopGroup;
+    protected TestPulsarService.Factory testPulsarServiceFactory;
+
+    private BrokerService brokerService;
+
 
     @BeforeMethod(alwaysRun = true)
     public void setup() throws Exception {
-        eventLoopGroup = new NioEventLoopGroup();
-        executor = OrderedExecutor.newBuilder().numThreads(1).build();
-        ServiceConfiguration svcConfig = spy(ServiceConfiguration.class);
+        ServiceConfiguration svcConfig = new ServiceConfiguration();
         svcConfig.setAdvertisedAddress("localhost");
         svcConfig.setBrokerShutdownTimeoutMs(0L);
         svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
@@ -190,47 +170,24 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         svcConfig.setClusterName("pulsar-cluster");
         svcConfig.setTopicLevelPoliciesEnabled(false);
         svcConfig.setSystemTopicEnabled(false);
-        pulsar = spyWithClassAndConstructorArgs(PulsarService.class, 
svcConfig);
-        doReturn(svcConfig).when(pulsar).getConfiguration();
-        doReturn(mock(Compactor.class)).when(pulsar).getCompactor();
-
-        // create SchemaRegistryService for testDeleteTopic() otherwise it 
will fail
-        schemaRegistryService = 
spyWithClassAndConstructorArgs(DefaultSchemaRegistryService.class);
-        
doReturn(schemaRegistryService).when(pulsar).getSchemaRegistryService();
-
-        mlFactoryMock = mock(ManagedLedgerFactory.class);
-        doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory();
-        doReturn(mock(PulsarClientImpl.class)).when(pulsar).getClient();
+        testPulsarServiceFactory = TestPulsarService.Factory.builder()
+                .config(svcConfig)
+                .useSpies(true)
+                .useTestPulsarResources(metadataStore)
+                .compactor(mock(Compactor.class))
+                .build();
+        brokerService = testPulsarServiceFactory.getBrokerService();
 
         doAnswer(invocationOnMock -> CompletableFuture.completedFuture(null))
-                .when(mlFactoryMock).getManagedLedgerPropertiesAsync(any());
+                
.when(testPulsarServiceFactory.getManagedLedgerFactory()).getManagedLedgerPropertiesAsync(any());
         doAnswer(invocation -> {
             DeleteLedgerCallback deleteLedgerCallback = 
invocation.getArgument(1);
             deleteLedgerCallback.deleteLedgerComplete(null);
             return null;
-        }).when(mlFactoryMock).asyncDelete(any(), any(), any());
-        // Mock metaStore.
-        doReturn(createMockBookKeeper(executor))
-            .when(pulsar).getBookKeeperClient();
-        doReturn(executor).when(pulsar).getOrderedExecutor();
-        doReturn(metadataStore).when(pulsar).getLocalMetadataStore();
-        doReturn(metadataStore).when(pulsar).getConfigurationMetadataStore();
-        // Mock pulsarResources.
-        PulsarResources pulsarResources = 
spyWithClassAndConstructorArgs(PulsarResources.class, metadataStore, 
metadataStore);
-        NamespaceResources nsr = 
spyWithClassAndConstructorArgs(NamespaceResources.class, metadataStore, 30);
-        TopicResources tsr = 
spyWithClassAndConstructorArgs(TopicResources.class, metadataStore);
-        doReturn(nsr).when(pulsarResources).getNamespaceResources();
-        doReturn(tsr).when(pulsarResources).getTopicResources();
-        PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
-            doReturn(pulsarResources).when(pulsar).getPulsarResources();
-        });
-        // Mock brokerService.
-        brokerService = spyWithClassAndConstructorArgs(BrokerService.class, 
pulsar, eventLoopGroup);
-        PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
-            doReturn(brokerService).when(pulsar).getBrokerService();
-        });
+        
}).when(testPulsarServiceFactory.getManagedLedgerFactory()).asyncDelete(any(), 
any(), any());
         // Mock serviceCnx.
-        serverCnx = 
spyWithClassAndConstructorArgsRecordingInvocations(ServerCnx.class, pulsar);
+        serverCnx = 
spyWithClassAndConstructorArgsRecordingInvocations(ServerCnx.class,
+                testPulsarServiceFactory.getPulsarService());
         doReturn(true).when(serverCnx).isActive();
         doReturn(true).when(serverCnx).isWritable();
         doReturn(new InetSocketAddress("localhost", 
1234)).when(serverCnx).clientAddress();
@@ -245,9 +202,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         NamespaceService nsSvc = mock(NamespaceService.class);
         NamespaceBundle bundle = mock(NamespaceBundle.class);
         
doReturn(CompletableFuture.completedFuture(bundle)).when(nsSvc).getBundleAsync(any());
-        PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
-            doReturn(nsSvc).when(pulsar).getNamespaceService();
-        });
+        
doReturn(nsSvc).when(testPulsarServiceFactory.getPulsarService()).getNamespaceService();
         doReturn(true).when(nsSvc).isServiceUnitOwned(any());
         doReturn(true).when(nsSvc).isServiceUnitActive(any());
         
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).isServiceUnitActiveAsync(any());
@@ -258,13 +213,10 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
 
     @AfterMethod(alwaysRun = true)
     public void teardown() throws Exception {
-        brokerService.close();
-        pulsar.close();
-        GracefulExecutorServicesShutdown.initiate()
-                .timeout(Duration.ZERO)
-                .shutdown(executor)
-                .handle().get();
-        EventLoopUtil.shutdownGracefully(eventLoopGroup).get();
+        if (testPulsarServiceFactory != null) {
+            testPulsarServiceFactory.close();
+            testPulsarServiceFactory = null;
+        }
     }
 
     @Test
@@ -273,18 +225,15 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         doReturn(new ArrayList<>()).when(ledgerMock).getCursors();
 
         final String topicName = "persistent://prop/use/ns-abc/topic1";
-        doAnswer(new Answer<Object>() {
-            @Override
-            public Object answer(InvocationOnMock invocationOnMock) {
-                ((OpenLedgerCallback) 
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
-                return null;
-            }
-        }).when(mlFactoryMock).asyncOpen(anyString(), 
any(ManagedLedgerConfig.class), any(OpenLedgerCallback.class),
-                any(Supplier.class), any());
+        doAnswer(invocationOnMock -> {
+            ((OpenLedgerCallback) 
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
+            return null;
+        }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+                .asyncOpen(anyString(), any(ManagedLedgerConfig.class), 
any(OpenLedgerCallback.class),
+                        any(Supplier.class), any());
 
-        CompletableFuture<Void> future = 
brokerService.getOrCreateTopic(topicName).thenAccept(topic -> {
-            assertTrue(topic.toString().contains(topicName));
-        }).exceptionally((t) -> {
+        CompletableFuture<Void> future = 
brokerService.getOrCreateTopic(topicName).thenAccept(topic ->
+                
assertTrue(topic.toString().contains(topicName))).exceptionally((t) -> {
             fail("should not fail");
             return null;
         });
@@ -300,16 +249,14 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
     @Test
     public void testCreateTopicMLFailure() {
         final String jinxedTopicName = "persistent://prop/use/ns-abc/topic3";
-        doAnswer(new Answer<Object>() {
-            @Override
-            public Object answer(InvocationOnMock invocationOnMock) {
-                new Thread(() -> ((OpenLedgerCallback) 
invocationOnMock.getArguments()[2])
-                        .openLedgerFailed(new ManagedLedgerException("Managed 
ledger failure"), null)).start();
+        doAnswer(invocationOnMock -> {
+            new Thread(() -> ((OpenLedgerCallback) 
invocationOnMock.getArguments()[2])
+                    .openLedgerFailed(new ManagedLedgerException("Managed 
ledger failure"), null)).start();
 
-                return null;
-            }
-        }).when(mlFactoryMock).asyncOpen(anyString(), 
any(ManagedLedgerConfig.class), any(OpenLedgerCallback.class),
-                any(Supplier.class), any());
+            return null;
+        }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+                .asyncOpen(anyString(), any(ManagedLedgerConfig.class), 
any(OpenLedgerCallback.class),
+                        any(Supplier.class), any());
 
         CompletableFuture<Topic> future = 
brokerService.getOrCreateTopic(jinxedTopicName);
 
@@ -367,7 +314,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
     }
 
     @Test
-    public void testDispatcherMultiConsumerReadFailed() throws Exception {
+    public void testDispatcherMultiConsumerReadFailed() {
         PersistentTopic topic = 
spyWithClassAndConstructorArgsRecordingInvocations(PersistentTopic.class, 
successTopicName, ledgerMock, brokerService);
         ManagedCursor cursor = mock(ManagedCursor.class);
         when(cursor.getName()).thenReturn("cursor");
@@ -379,7 +326,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
     }
 
     @Test
-    public void testDispatcherSingleConsumerReadFailed() throws Exception {
+    public void testDispatcherSingleConsumerReadFailed() {
         PersistentTopic topic = 
spyWithClassAndConstructorArgsRecordingInvocations(PersistentTopic.class, 
successTopicName, ledgerMock, brokerService);
         ManagedCursor cursor = mock(ManagedCursor.class);
         when(cursor.getName()).thenReturn("cursor");
@@ -395,26 +342,18 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         final String successTopicName = 
"persistent://prop/use/ns-abc/successTopic";
 
         final ManagedLedger ledgerMock = mock(ManagedLedger.class);
-        doReturn(new ArrayList<Object>()).when(ledgerMock).getCursors();
+        doReturn(new ArrayList<>()).when(ledgerMock).getCursors();
 
         PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
 
-        MessageMetadata messageMetadata = new MessageMetadata()
-                .setPublishTime(System.currentTimeMillis())
-                .setProducerName("prod-name")
-                .setSequenceId(1);
-
         ByteBuf payload = Unpooled.wrappedBuffer("content".getBytes());
         final CountDownLatch latch = new CountDownLatch(1);
 
         // override asyncAddEntry callback to return error
-        doAnswer(new Answer<Object>() {
-            @Override
-            public Object answer(InvocationOnMock invocationOnMock) throws 
Throwable {
-                ((AddEntryCallback) 
invocationOnMock.getArguments()[1]).addFailed(
-                        new ManagedLedgerException("Managed ledger failure"), 
invocationOnMock.getArguments()[2]);
-                return null;
-            }
+        doAnswer((Answer<Object>) invocationOnMock -> {
+            ((AddEntryCallback) invocationOnMock.getArguments()[1]).addFailed(
+                    new ManagedLedgerException("Managed ledger failure"), 
invocationOnMock.getArguments()[2]);
+            return null;
         }).when(ledgerMock).asyncAddEntry(any(ByteBuf.class), 
any(AddEntryCallback.class), any());
 
         topic.publishMessage(payload, (exception, ledgerId, entryId) -> {
@@ -429,7 +368,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
     }
 
     @Test
-    public void testAddRemoveProducer() throws Exception {
+    public void testAddRemoveProducer() {
         PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
 
         String role = "appid1";
@@ -479,7 +418,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
     }
 
     @Test
-    public void testProducerOverwrite() throws Exception {
+    public void testProducerOverwrite() {
         PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
         String role = "appid1";
         Producer producer1 = new Producer(topic, serverCnx, 1 /* producer id 
*/, "prod-name",
@@ -548,7 +487,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         topic.getProducers().values().forEach(producer -> 
Assert.assertEquals(producer.getEpoch(), 3));
     }
 
-    private void testMaxProducers() throws Exception {
+    private void testMaxProducers() {
         PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
         topic.initialize();
         String role = "appid1";
@@ -576,40 +515,28 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
     }
 
     @Test
-    public void testMaxProducersForBroker() throws Exception {
+    public void testMaxProducersForBroker() {
         // set max clients
-        ServiceConfiguration svcConfig = spy(ServiceConfiguration.class);
-        doReturn(2).when(svcConfig).getMaxProducersPerTopic();
-        doReturn(svcConfig).when(pulsar).getConfiguration();
+        testPulsarServiceFactory.getConfig().setMaxProducersPerTopic(2);
         testMaxProducers();
     }
 
     @Test
     public void testMaxProducersForNamespace() throws Exception {
-        ServiceConfiguration svcConfig = spy(ServiceConfiguration.class);
-        doReturn(svcConfig).when(pulsar).getConfiguration();
         // set max clients
         Policies policies = new Policies();
         policies.max_producers_per_topic = 2;
-        when(pulsar.getPulsarResources().getNamespaceResources()
-                
.getPoliciesIfCached(TopicName.get(successTopicName).getNamespaceObject()))
-                .thenReturn(Optional.of(policies));
-
-        when(pulsar.getPulsarResources().getNamespaceResources()
-                
.getPolicies(TopicName.get(successTopicName).getNamespaceObject()))
-                .thenReturn(Optional.of(policies));
-        when(pulsar.getPulsarResources().getNamespaceResources()
-                
.getPoliciesAsync(TopicName.get(successTopicName).getNamespaceObject()))
-                
.thenReturn(CompletableFuture.completedFuture(Optional.of(policies)));
+        testPulsarServiceFactory.getPulsarResources().getNamespaceResources()
+                        
.createPolicies(TopicName.get(successTopicName).getNamespaceObject(),
+                                policies);
         testMaxProducers();
     }
 
-    private Producer getMockedProducerWithSpecificAddress(Topic topic, long 
producerId, InetAddress address)
-            throws Exception {
+    private Producer getMockedProducerWithSpecificAddress(Topic topic, long 
producerId, InetAddress address) {
         final String producerNameBase = "producer";
         final String role = "appid1";
 
-        ServerCnx cnx = spyWithClassAndConstructorArgs(ServerCnx.class, 
pulsar);
+        ServerCnx cnx = testPulsarServiceFactory.createServerCnxSpy();
         doReturn(true).when(cnx).isActive();
         doReturn(true).when(cnx).isWritable();
         doReturn(new InetSocketAddress(address, 
1234)).when(cnx).clientAddress();
@@ -623,9 +550,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
     @Test
     public void testMaxSameAddressProducers() throws Exception {
         // set max clients
-        ServiceConfiguration svcConfig = spy(ServiceConfiguration.class);
-        doReturn(2).when(svcConfig).getMaxSameAddressProducersPerTopic();
-        doReturn(svcConfig).when(pulsar).getConfiguration();
+        
testPulsarServiceFactory.getConfig().setMaxSameAddressProducersPerTopic(2);
 
         PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
 
@@ -719,14 +644,13 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
     }
 
     private SubscriptionOption getSubscriptionOption(CommandSubscribe cmd) {
-        SubscriptionOption subscriptionOption = 
SubscriptionOption.builder().cnx(serverCnx)
+        return SubscriptionOption.builder().cnx(serverCnx)
                 
.subscriptionName(cmd.getSubscription()).consumerId(cmd.getConsumerId()).subType(cmd.getSubType())
                 
.priorityLevel(0).consumerName(cmd.getConsumerName()).isDurable(cmd.isDurable()).startMessageId(null)
                 .metadata(Collections.emptyMap()).readCompacted(false)
                 
.initialPosition(InitialPosition.Latest).subscriptionProperties(Optional.empty())
                 
.startMessageRollbackDurationSec(0).replicatedSubscriptionStateArg(false).keySharedMeta(null)
                 .build();
-        return subscriptionOption;
     }
 
     @Test
@@ -929,33 +853,21 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
     @Test
     public void testMaxConsumersSharedForBroker() throws Exception {
         // set max clients
-        ServiceConfiguration svcConfig = spy(ServiceConfiguration.class);
-        doReturn(2).when(svcConfig).getMaxConsumersPerSubscription();
-        doReturn(3).when(svcConfig).getMaxConsumersPerTopic();
-        doReturn(svcConfig).when(pulsar).getConfiguration();
-
+        testPulsarServiceFactory.getConfig().setMaxConsumersPerSubscription(2);
+        testPulsarServiceFactory.getConfig().setMaxConsumersPerTopic(3);
         testMaxConsumersShared();
     }
 
     @Test
     public void testMaxConsumersSharedForNamespace() throws Exception {
-        ServiceConfiguration svcConfig = spy(ServiceConfiguration.class);
-        doReturn(svcConfig).when(pulsar).getConfiguration();
-
         // set max clients
         Policies policies = new Policies();
         policies.max_consumers_per_subscription = 2;
         policies.max_consumers_per_topic = 3;
 
-        when(pulsar.getPulsarResources().getNamespaceResources()
-                
.getPolicies(TopicName.get(successTopicName).getNamespaceObject()))
-                .thenReturn(Optional.of(policies));
-        when(pulsar.getPulsarResources().getNamespaceResources()
-                
.getPoliciesIfCached(TopicName.get(successTopicName).getNamespaceObject()))
-                .thenReturn(Optional.of(policies));
-        when(pulsar.getPulsarResources().getNamespaceResources()
-                
.getPoliciesAsync(TopicName.get(successTopicName).getNamespaceObject()))
-                
.thenReturn(CompletableFuture.completedFuture(Optional.of(policies)));
+        testPulsarServiceFactory.getPulsarResources().getNamespaceResources()
+                
.createPolicies(TopicName.get(successTopicName).getNamespaceObject(),
+                        policies);
 
         testMaxConsumersShared();
     }
@@ -1037,42 +949,32 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
     @Test
     public void testMaxConsumersFailoverForBroker() throws Exception {
         // set max clients
-        ServiceConfiguration svcConfig = spy(ServiceConfiguration.class);
-        doReturn(2).when(svcConfig).getMaxConsumersPerSubscription();
-        doReturn(3).when(svcConfig).getMaxConsumersPerTopic();
-        doReturn(svcConfig).when(pulsar).getConfiguration();
+        testPulsarServiceFactory.getConfig().setMaxConsumersPerSubscription(2);
+        testPulsarServiceFactory.getConfig().setMaxConsumersPerTopic(3);
 
         testMaxConsumersFailover();
     }
 
     @Test
     public void testMaxConsumersFailoverForNamespace() throws Exception {
-        ServiceConfiguration svcConfig = spy(ServiceConfiguration.class);
-        doReturn(svcConfig).when(pulsar).getConfiguration();
-
         // set max clients
         Policies policies = new Policies();
         policies.max_consumers_per_subscription = 2;
         policies.max_consumers_per_topic = 3;
 
-        when(pulsar.getPulsarResources().getNamespaceResources()
-                
.getPolicies(TopicName.get(successTopicName).getNamespaceObject()))
-                .thenReturn(Optional.of(policies));
-        when(pulsar.getPulsarResources().getNamespaceResources()
-                
.getPoliciesIfCached(TopicName.get(successTopicName).getNamespaceObject()))
-                .thenReturn(Optional.of(policies));
-        when(pulsar.getPulsarResources().getNamespaceResources()
-                
.getPoliciesAsync(TopicName.get(successTopicName).getNamespaceObject()))
-                
.thenReturn(CompletableFuture.completedFuture(Optional.of(policies)));
+        testPulsarServiceFactory.getPulsarResources().getNamespaceResources()
+                
.createPolicies(TopicName.get(successTopicName).getNamespaceObject(),
+                        policies);
+
         testMaxConsumersFailover();
     }
 
     private Consumer getMockedConsumerWithSpecificAddress(Topic topic, 
Subscription sub, long consumerId,
-                                                          InetAddress address) 
throws Exception {
+                                                          InetAddress address) 
{
         final String consumerNameBase = "consumer";
         final String role = "appid1";
 
-        ServerCnx cnx = spyWithClassAndConstructorArgs(ServerCnx.class, 
pulsar);
+        ServerCnx cnx = testPulsarServiceFactory.createServerCnxSpy();
         doReturn(true).when(cnx).isActive();
         doReturn(true).when(cnx).isWritable();
         doReturn(new InetSocketAddress(address, 
1234)).when(cnx).clientAddress();
@@ -1086,9 +988,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
     @Test
     public void testMaxSameAddressConsumers() throws Exception {
         // set max clients
-        ServiceConfiguration svcConfig = spy(ServiceConfiguration.class);
-        doReturn(2).when(svcConfig).getMaxSameAddressConsumersPerTopic();
-        doReturn(svcConfig).when(pulsar).getConfiguration();
+        
testPulsarServiceFactory.getConfig().setMaxSameAddressConsumersPerTopic(2);
 
         PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
         PersistentSubscription sub1 = new PersistentSubscription(topic, 
"sub1", cursorMock, false);
@@ -1195,13 +1095,10 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
                 true, serverCnx, "myrole-1", Collections.emptyMap(), false /* 
read compacted */, null, MessageId.latest, DEFAULT_CONSUMER_EPOCH);
         sub.addConsumer(consumer1);
 
-        doAnswer(new Answer<Object>() {
-            @Override
-            public Object answer(InvocationOnMock invocationOnMock) throws 
Throwable {
-                ((DeleteCursorCallback) 
invocationOnMock.getArguments()[1]).deleteCursorComplete(null);
-                Thread.sleep(1000);
-                return null;
-            }
+        doAnswer(invocationOnMock -> {
+            ((DeleteCursorCallback) 
invocationOnMock.getArguments()[1]).deleteCursorComplete(null);
+            Thread.sleep(1000);
+            return null;
         }).when(ledgerMock).asyncDeleteCursor(matches(".*success.*"), 
any(DeleteCursorCallback.class), any());
 
         @Cleanup("shutdownNow")
@@ -1294,7 +1191,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         topic.addProducer(producer, new CompletableFuture<>()).join();
 
         CompletableFuture<Void> cf = topic.delete();
-        Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> 
cf.isDone());
+        Awaitility.await().atMost(30, TimeUnit.SECONDS).until(cf::isDone);
         assertTrue(cf.isCompletedExceptionally());
         assertFalse((boolean) isFencedField.get(topic));
         assertFalse((boolean) isClosingOrDeletingField.get(topic));
@@ -1313,7 +1210,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         f1.get();
 
         CompletableFuture<Void> cf2 = topic.delete();
-        Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> 
cf2.isDone());
+        Awaitility.await().atMost(30, TimeUnit.SECONDS).until(cf2::isDone);
         assertTrue(cf2.isCompletedExceptionally());
         assertFalse((boolean) isFencedField.get(topic));
         assertFalse((boolean) isClosingOrDeletingField.get(topic));
@@ -1340,37 +1237,31 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         final CountDownLatch counter = new CountDownLatch(2);
         final AtomicBoolean gotException = new AtomicBoolean(false);
 
-        Thread deleter = new Thread() {
-            @Override
-            public void run() {
-                try {
-                    barrier.await();
-                    assertFalse(topic.delete().isCompletedExceptionally());
-                } catch (Exception e) {
-                    e.printStackTrace();
-                    gotException.set(true);
-                } finally {
-                    counter.countDown();
-                }
+        Thread deleter = new Thread(() -> {
+            try {
+                barrier.await();
+                assertFalse(topic.delete().isCompletedExceptionally());
+            } catch (Exception e) {
+                e.printStackTrace();
+                gotException.set(true);
+            } finally {
+                counter.countDown();
             }
-        };
+        });
 
-        Thread unsubscriber = new Thread() {
-            @Override
-            public void run() {
-                try {
-                    barrier.await();
-
-                    // do topic unsubscribe
-                    topic.unsubscribe(successSubName);
-                } catch (Exception e) {
-                    e.printStackTrace();
-                    gotException.set(true);
-                } finally {
-                    counter.countDown();
-                }
+        Thread unsubscriber = new Thread(() -> {
+            try {
+                barrier.await();
+
+                // do topic unsubscribe
+                topic.unsubscribe(successSubName);
+            } catch (Exception e) {
+                e.printStackTrace();
+                gotException.set(true);
+            } finally {
+                counter.countDown();
             }
-        };
+        });
 
         deleter.start();
         unsubscriber.start();
@@ -1398,42 +1289,36 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         final CountDownLatch counter = new CountDownLatch(2);
         final AtomicBoolean gotException = new AtomicBoolean(false);
 
-        Thread deleter = new Thread() {
-            @Override
-            public void run() {
-                try {
-                    barrier.await();
-                    // assertTrue(topic.unsubscribe(successSubName).isDone());
-                    Thread.sleep(5, 0);
-                    log.info("deleter outcome is {}", topic.delete().get());
-                } catch (Exception e) {
-                    e.printStackTrace();
-                    gotException.set(true);
-                } finally {
-                    counter.countDown();
-                }
+        Thread deleter = new Thread(() -> {
+            try {
+                barrier.await();
+                // assertTrue(topic.unsubscribe(successSubName).isDone());
+                Thread.sleep(5, 0);
+                log.info("deleter outcome is {}", topic.delete().get());
+            } catch (Exception e) {
+                e.printStackTrace();
+                gotException.set(true);
+            } finally {
+                counter.countDown();
             }
-        };
+        });
 
-        Thread unsubscriber = new Thread() {
-            @Override
-            public void run() {
-                try {
-                    barrier.await();
-                    // do subscription delete
-                    ConcurrentOpenHashMap<String, PersistentSubscription> 
subscriptions = topic.getSubscriptions();
-                    PersistentSubscription ps = 
subscriptions.get(successSubName);
-                    // Thread.sleep(5,0);
-                    log.info("unsubscriber outcome is {}", 
ps.doUnsubscribe(ps.getConsumers().get(0)).get());
-                    // assertFalse(ps.delete().isCompletedExceptionally());
-                } catch (Exception e) {
-                    e.printStackTrace();
-                    gotException.set(true);
-                } finally {
-                    counter.countDown();
-                }
+        Thread unsubscriber = new Thread(() -> {
+            try {
+                barrier.await();
+                // do subscription delete
+                ConcurrentOpenHashMap<String, PersistentSubscription> 
subscriptions = topic.getSubscriptions();
+                PersistentSubscription ps = subscriptions.get(successSubName);
+                // Thread.sleep(5,0);
+                log.info("unsubscriber outcome is {}", 
ps.doUnsubscribe(ps.getConsumers().get(0)).get());
+                // assertFalse(ps.delete().isCompletedExceptionally());
+            } catch (Exception e) {
+                e.printStackTrace();
+                gotException.set(true);
+            } finally {
+                counter.countDown();
             }
-        };
+        });
 
         deleter.start();
         unsubscriber.start();
@@ -1447,13 +1332,10 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         PersistentTopic topic = (PersistentTopic) 
brokerService.getOrCreateTopic(successTopicName).get();
 
         // override ledger deletion callback to slow down deletion
-        doAnswer(new Answer<Object>() {
-            @Override
-            public Object answer(InvocationOnMock invocationOnMock) throws 
Throwable {
-                Thread.sleep(1000);
-                ((DeleteLedgerCallback) 
invocationOnMock.getArguments()[0]).deleteLedgerComplete(null);
-                return null;
-            }
+        doAnswer(invocationOnMock -> {
+            Thread.sleep(1000);
+            ((DeleteLedgerCallback) 
invocationOnMock.getArguments()[0]).deleteLedgerComplete(null);
+            return null;
         }).when(ledgerMock).asyncDelete(any(DeleteLedgerCallback.class), 
any());
 
         @Cleanup("shutdownNow")
@@ -1500,16 +1382,13 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         cursorMock = mock(ManagedCursorImpl.class);
         final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
 
-        doReturn(new ArrayList<Object>()).when(ledgerMock).getCursors();
+        doReturn(new ArrayList<>()).when(ledgerMock).getCursors();
         doReturn("mockCursor").when(cursorMock).getName();
         doReturn(true).when(cursorMock).isDurable();
         // doNothing().when(cursorMock).asyncClose(new CloseCallback() {
-        doAnswer(new Answer<Object>() {
-            @Override
-            public Object answer(InvocationOnMock invocationOnMock) throws 
Throwable {
-                // return closeFuture.get();
-                return closeFuture.complete(null);
-            }
+        doAnswer((Answer<Object>) invocationOnMock -> {
+            // return closeFuture.get();
+            return closeFuture.complete(null);
         })
 
                 .when(cursorMock).asyncClose(new CloseCallback() {
@@ -1530,78 +1409,56 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
                 }, null);
 
         // call openLedgerComplete with ledgerMock on ML factory asyncOpen
-        doAnswer(new Answer<Object>() {
-            @Override
-            public Object answer(InvocationOnMock invocationOnMock) throws 
Throwable {
-                ((OpenLedgerCallback) 
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
-                return null;
-            }
-        }).when(mlFactoryMock).asyncOpen(matches(".*success.*"), 
any(ManagedLedgerConfig.class),
-                any(OpenLedgerCallback.class), any(Supplier.class), any());
+        doAnswer(invocationOnMock -> {
+            ((OpenLedgerCallback) 
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
+            return null;
+        }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+                .asyncOpen(matches(".*success.*"), 
any(ManagedLedgerConfig.class),
+                        any(OpenLedgerCallback.class), any(Supplier.class), 
any());
 
         // call openLedgerFailed on ML factory asyncOpen
-        doAnswer(new Answer<Object>() {
-            @Override
-            public Object answer(InvocationOnMock invocationOnMock) throws 
Throwable {
-                ((OpenLedgerCallback) invocationOnMock.getArguments()[2])
-                        .openLedgerFailed(new ManagedLedgerException("Managed 
ledger failure"), null);
-                return null;
-            }
-        }).when(mlFactoryMock).asyncOpen(matches(".*fail.*"), 
any(ManagedLedgerConfig.class),
-                any(OpenLedgerCallback.class), any(Supplier.class), any());
+        doAnswer(invocationOnMock -> {
+            ((OpenLedgerCallback) invocationOnMock.getArguments()[2])
+                    .openLedgerFailed(new ManagedLedgerException("Managed 
ledger failure"), null);
+            return null;
+        }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+                .asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class),
+                        any(OpenLedgerCallback.class), any(Supplier.class), 
any());
 
         // call addComplete on ledger asyncAddEntry
-        doAnswer(new Answer<Object>() {
-            @Override
-            public Object answer(InvocationOnMock invocationOnMock) throws 
Throwable {
-                ((AddEntryCallback) 
invocationOnMock.getArguments()[1]).addComplete(new PositionImpl(1, 1),
-                        null,
-                        invocationOnMock.getArguments()[2]);
-                return null;
-            }
+        doAnswer(invocationOnMock -> {
+            ((AddEntryCallback) 
invocationOnMock.getArguments()[1]).addComplete(new PositionImpl(1, 1),
+                    null,
+                    invocationOnMock.getArguments()[2]);
+            return null;
         }).when(ledgerMock).asyncAddEntry(any(ByteBuf.class), 
any(AddEntryCallback.class), any());
 
         // call openCursorComplete on cursor asyncOpen
-        doAnswer(new Answer<Object>() {
-            @Override
-            public Object answer(InvocationOnMock invocationOnMock) throws 
Throwable {
-                ((OpenCursorCallback) 
invocationOnMock.getArguments()[2]).openCursorComplete(cursorMock, null);
-                return null;
-            }
+        doAnswer(invocationOnMock -> {
+            ((OpenCursorCallback) 
invocationOnMock.getArguments()[2]).openCursorComplete(cursorMock, null);
+            return null;
         }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), 
any(InitialPosition.class), any(OpenCursorCallback.class), any());
 
-        doAnswer(new Answer<Object>() {
-            @Override
-            public Object answer(InvocationOnMock invocationOnMock) throws 
Throwable {
-                ((OpenCursorCallback) 
invocationOnMock.getArguments()[4]).openCursorComplete(cursorMock, null);
-                return null;
-            }
+        doAnswer(invocationOnMock -> {
+            ((OpenCursorCallback) 
invocationOnMock.getArguments()[4]).openCursorComplete(cursorMock, null);
+            return null;
         }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), 
any(InitialPosition.class), any(Map.class),
                 any(Map.class), any(OpenCursorCallback.class), any());
 
-        doAnswer(new Answer<Object>() {
-            @Override
-            public Object answer(InvocationOnMock invocationOnMock) throws 
Throwable {
-                ((CloseCallback) 
invocationOnMock.getArguments()[0]).closeComplete(null);
-                return null;
-            }
+        doAnswer(invocationOnMock -> {
+            ((CloseCallback) 
invocationOnMock.getArguments()[0]).closeComplete(null);
+            return null;
         }).when(ledgerMock).asyncClose(any(CloseCallback.class), any());
 
         // call deleteLedgerComplete on ledger asyncDelete
-        doAnswer(new Answer<Object>() {
-            @Override
-            public Object answer(InvocationOnMock invocationOnMock) throws 
Throwable {
-                ((DeleteLedgerCallback) 
invocationOnMock.getArguments()[0]).deleteLedgerComplete(null);
-                return null;
-            }
+        doAnswer(invocationOnMock -> {
+            ((DeleteLedgerCallback) 
invocationOnMock.getArguments()[0]).deleteLedgerComplete(null);
+            return null;
         }).when(ledgerMock).asyncDelete(any(DeleteLedgerCallback.class), 
any());
 
-        doAnswer(new Answer<Object>() {
-            @Override
-            public Object answer(InvocationOnMock invocationOnMock) throws 
Throwable {
-                ((DeleteCursorCallback) 
invocationOnMock.getArguments()[1]).deleteCursorComplete(null);
-                return null;
-            }
+        doAnswer(invocationOnMock -> {
+            ((DeleteCursorCallback) 
invocationOnMock.getArguments()[1]).deleteCursorComplete(null);
+            return null;
         }).when(ledgerMock).asyncDeleteCursor(matches(".*success.*"), 
any(DeleteCursorCallback.class), any());
 
         doAnswer((invokactionOnMock) -> {
@@ -1630,13 +1487,9 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         assertFalse((boolean) isFencedField.get(topic));
         assertFalse((boolean) isClosingOrDeletingField.get(topic));
 
-        metadataStore.failConditional(new MetadataStoreException("injected 
error"), (op, path) -> {
-            if (op == FaultInjectionMetadataStore.OperationType.PUT
-                && 
path.equals("/admin/partitioned-topics/prop/use/ns-abc/persistent/successTopic"))
 {
-                return true;
-            }
-            return false;
-        });
+        metadataStore.failConditional(new MetadataStoreException("injected 
error"), (op, path) ->
+                op == FaultInjectionMetadataStore.OperationType.PUT &&
+                        
path.equals("/admin/partitioned-topics/prop/use/ns-abc/persistent/successTopic"));
         try {
             topic.delete().get();
             fail();
@@ -1830,8 +1683,10 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         String remoteReplicatorName = topic.getReplicatorPrefix() + "." + 
remoteCluster;
         ConcurrentOpenHashMap<String, Replicator> replicatorMap = 
topic.getReplicators();
 
+        PulsarService pulsar = testPulsarServiceFactory.getPulsarService();
         final URL brokerUrl = new URL(
-                "http://"; + pulsar.getAdvertisedAddress() + ":" + 
pulsar.getConfiguration().getBrokerServicePort().get());
+                "http://"; + pulsar.getAdvertisedAddress() + ":" + 
pulsar.getConfiguration().getBrokerServicePort()
+                        .get());
         @Cleanup
         PulsarClient client = 
PulsarClient.builder().serviceUrl(brokerUrl.toString()).build();
         ManagedCursor cursor = mock(ManagedCursorImpl.class);
@@ -1871,12 +1726,14 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         String remoteCluster = "remote";
         final ManagedLedger ledgerMock = mock(ManagedLedger.class);
         doNothing().when(ledgerMock).asyncDeleteCursor(any(), any(), any());
-        doReturn(new ArrayList<Object>()).when(ledgerMock).getCursors();
+        doReturn(new ArrayList<>()).when(ledgerMock).getCursors();
 
         PersistentTopic topic = new PersistentTopic(globalTopicName, 
ledgerMock, brokerService);
 
+        PulsarService pulsar = testPulsarServiceFactory.getPulsarService();
         final URL brokerUrl = new URL(
-                "http://"; + pulsar.getAdvertisedAddress() + ":" + 
pulsar.getConfiguration().getBrokerServicePort().get());
+                "http://"; + pulsar.getAdvertisedAddress() + ":" + 
pulsar.getConfiguration().getBrokerServicePort()
+                        .get());
         @Cleanup
         PulsarClient client = 
spy(PulsarClient.builder().serviceUrl(brokerUrl.toString()).build());
         PulsarClientImpl clientImpl = (PulsarClientImpl) client;
@@ -1907,7 +1764,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
     }
 
     @Test
-    public void testCompactorSubscription() throws Exception {
+    public void testCompactorSubscription() {
         PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
         CompactedTopic compactedTopic = mock(CompactedTopic.class);
         when(compactedTopic.newCompactedLedger(any(Position.class), anyLong()))
@@ -1924,7 +1781,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
 
 
     @Test
-    public void testCompactorSubscriptionUpdatedOnInit() throws Exception {
+    public void testCompactorSubscriptionUpdatedOnInit() {
         long ledgerId = 0xc0bfefeL;
         Map<String, Long> properties = 
Map.of(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY, ledgerId);
         PositionImpl position = new PositionImpl(1, 1);
@@ -1943,15 +1800,15 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
     @Test
     public void testCompactionTriggeredAfterThresholdFirstInvocation() throws 
Exception {
         CompletableFuture<Long> compactPromise = new CompletableFuture<>();
-        Compactor compactor = pulsar.getCompactor();
+        Compactor compactor = 
testPulsarServiceFactory.getPulsarService().getCompactor();
         doReturn(compactPromise).when(compactor).compact(anyString());
 
         Policies policies = new Policies();
         policies.compaction_threshold = 1L;
 
-        NamespaceResources nsr = 
pulsar.getPulsarResources().getNamespaceResources();
-        NamespaceName ns = 
TopicName.get(successTopicName).getNamespaceObject();
-        
doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(nsr).getPoliciesAsync(ns);
+        testPulsarServiceFactory.getPulsarResources().getNamespaceResources()
+                
.createPolicies(TopicName.get(successTopicName).getNamespaceObject(),
+                        policies);
 
         PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
         topic.initialize().get();
@@ -1974,7 +1831,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
     @Test
     public void testCompactionTriggeredAfterThresholdSecondInvocation() throws 
Exception {
         CompletableFuture<Long> compactPromise = new CompletableFuture<>();
-        Compactor compactor = pulsar.getCompactor();
+        Compactor compactor = 
testPulsarServiceFactory.getPulsarService().getCompactor();
         doReturn(compactPromise).when(compactor).compact(anyString());
 
         ManagedCursor subCursor = mock(ManagedCursor.class);
@@ -1984,9 +1841,9 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         Policies policies = new Policies();
         policies.compaction_threshold = 1L;
 
-        NamespaceResources nsr = 
pulsar.getPulsarResources().getNamespaceResources();
-        NamespaceName ns = 
TopicName.get(successTopicName).getNamespaceObject();
-        
doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(nsr).getPoliciesAsync(ns);
+        testPulsarServiceFactory.getPulsarResources().getNamespaceResources()
+                
.createPolicies(TopicName.get(successTopicName).getNamespaceObject(),
+                        policies);
 
         PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
         topic.initialize().get();
@@ -2008,15 +1865,15 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
     @Test
     public void testCompactionDisabledWithZeroThreshold() throws Exception {
         CompletableFuture<Long> compactPromise = new CompletableFuture<>();
-        Compactor compactor = pulsar.getCompactor();
+        Compactor compactor = 
testPulsarServiceFactory.getPulsarService().getCompactor();
         doReturn(compactPromise).when(compactor).compact(anyString());
 
         Policies policies = new Policies();
         policies.compaction_threshold = 0L;
 
-        NamespaceResources nsr = 
pulsar.getPulsarResources().getNamespaceResources();
-        NamespaceName ns = 
TopicName.get(successTopicName).getNamespaceObject();
-        
doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(nsr).getPoliciesAsync(ns);
+        testPulsarServiceFactory.getPulsarResources().getNamespaceResources()
+                
.createPolicies(TopicName.get(successTopicName).getNamespaceObject(),
+                        policies);
 
         doReturn(1000L).when(ledgerMock).getEstimatedBacklogSize();
 
@@ -2029,7 +1886,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
     @Test
     public void testBacklogCursor() throws Exception {
         int backloggedThreshold = 10;
-        
pulsar.getConfiguration().setManagedLedgerCursorBackloggedThreshold(backloggedThreshold);
+        
testPulsarServiceFactory.getConfig().setManagedLedgerCursorBackloggedThreshold(backloggedThreshold);
 
         ManagedLedgerImpl ledger = (ManagedLedgerImpl) 
factory.open("cache_backlog_ledger");
         PersistentTopic topic = new PersistentTopic(successTopicName, ledger, 
brokerService);
@@ -2170,13 +2027,11 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
                 true, serverCnx, "app1", Collections.emptyMap(), false, null, 
MessageId.latest, DEFAULT_CONSUMER_EPOCH);
         addConsumerToSubscription.invoke(topic, nonDeletableSubscription1, 
consumer);
 
-        NamespaceResources nsr = 
pulsar.getPulsarResources().getNamespaceResources();
-        NamespaceName ns = 
TopicName.get(successTopicName).getNamespaceObject();
-        doReturn(Optional.of(new Policies())).when(nsr).getPolicies(ns);
+        testPulsarServiceFactory.getPulsarResources().getNamespaceResources()
+                
.createPolicies(TopicName.get(successTopicName).getNamespaceObject(),
+                        new Policies());
 
-        ServiceConfiguration svcConfig = spy(ServiceConfiguration.class);
-        doReturn(5).when(svcConfig).getSubscriptionExpirationTimeMinutes();
-        doReturn(svcConfig).when(pulsar).getConfiguration();
+        
testPulsarServiceFactory.getConfig().setSubscriptionExpirationTimeMinutes(5);
 
         doReturn(System.currentTimeMillis() - 
TimeUnit.MINUTES.toMillis(6)).when(cursorMock).getLastActive();
 
@@ -2189,8 +2044,6 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
 
     @Test
     public void testTopicFencingTimeout() throws Exception {
-        ServiceConfiguration svcConfig = spy(ServiceConfiguration.class);
-        doReturn(svcConfig).when(pulsar).getConfiguration();
         PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
 
         Method fence = PersistentTopic.class.getDeclaredMethod("fence");
@@ -2205,7 +2058,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         Field isClosingOrDeletingField = 
PersistentTopic.class.getDeclaredField("isClosingOrDeleting");
         isClosingOrDeletingField.setAccessible(true);
 
-        doReturn(10).when(svcConfig).getTopicFencingTimeoutSeconds();
+        testPulsarServiceFactory.getConfig().setTopicFencingTimeoutSeconds(10);
         fence.invoke(topic);
         unfence.invoke(topic);
         ScheduledFuture<?> fencedTopicMonitoringTask = (ScheduledFuture<?>) 
fencedTopicMonitoringTaskField.get(topic);
@@ -2214,7 +2067,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         assertFalse((boolean) isFencedField.get(topic));
         assertFalse((boolean) isClosingOrDeletingField.get(topic));
 
-        doReturn(1).when(svcConfig).getTopicFencingTimeoutSeconds();
+        testPulsarServiceFactory.getConfig().setTopicFencingTimeoutSeconds(1);
         fence.invoke(topic);
         Thread.sleep(2000);
         fencedTopicMonitoringTask = (ScheduledFuture<?>) 
fencedTopicMonitoringTaskField.get(topic);
@@ -2226,7 +2079,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
 
     @Test
     public void testTopicCloseFencingTimeout() throws Exception {
-        pulsar.getConfiguration().setTopicFencingTimeoutSeconds(10);
+        testPulsarServiceFactory.getConfig().setTopicFencingTimeoutSeconds(10);
         Method fence = PersistentTopic.class.getDeclaredMethod("fence");
         fence.setAccessible(true);
         Field fencedTopicMonitoringTaskField = 
PersistentTopic.class.getDeclaredField("fencedTopicMonitoringTask");
@@ -2289,7 +2142,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
     }
 
     @Test
-    public void testDisconnectProducer() throws Exception {
+    public void testDisconnectProducer() {
         PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
         String role = "appid1";
         Producer producer = new Producer(topic, serverCnx, 1 /* producer id 
*/, "prod-name",
@@ -2300,7 +2153,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         producer.disconnect();
         producer.disconnect();
         verify(serverCnx).execute(any());
-    };
+    }
 
     @Test
     public void testKeySharedMetadataExposedToStats() throws Exception {
@@ -2360,32 +2213,24 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
     }
 
     @Test
-    public void testGetReplicationClusters() throws Exception {
+    public void testGetReplicationClusters() throws MetadataStoreException {
         PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
         topic.initialize();
         
assertEquals(topic.getHierarchyTopicPolicies().getReplicationClusters().get(), 
Collections.emptyList());
 
-        PulsarResources pulsarResources = 
spyWithClassAndConstructorArgs(PulsarResources.class, metadataStore, 
metadataStore);
-        NamespaceResources nsr = 
spyWithClassAndConstructorArgs(NamespaceResources.class, metadataStore, 30);
-        doReturn(nsr).when(pulsarResources).getNamespaceResources();
-        PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
-            doReturn(pulsarResources).when(pulsar).getPulsarResources();
-        });
-        CompletableFuture<Optional<Policies>> policiesFuture = new 
CompletableFuture<>();
         Policies policies = new Policies();
         Set<String> namespaceClusters = new HashSet<>();
         namespaceClusters.add("namespace-cluster");
         policies.replication_clusters = namespaceClusters;
-        Optional<Policies> optionalPolicies = Optional.of(policies);
-        policiesFuture.complete(optionalPolicies);
-        doReturn(policiesFuture).when(nsr).getPoliciesAsync(any());
+        testPulsarServiceFactory.getPulsarResources().getNamespaceResources()
+                
.createPolicies(TopicName.get(successTopicName).getNamespaceObject(), policies);
 
         topic = new PersistentTopic(successTopicName, ledgerMock, 
brokerService);
         topic.initialize();
         
assertEquals(topic.getHierarchyTopicPolicies().getReplicationClusters().get(), 
namespaceClusters);
 
         TopicPoliciesService topicPoliciesService = 
mock(TopicPoliciesService.class);
-        doReturn(topicPoliciesService).when(pulsar).getTopicPoliciesService();
+        
doReturn(topicPoliciesService).when(testPulsarServiceFactory.getPulsarService()).getTopicPoliciesService();
         CompletableFuture<Optional<TopicPolicies>> topicPoliciesFuture = new 
CompletableFuture<>();
         TopicPolicies topicPolicies = new TopicPolicies();
         List<String> topicClusters = new ArrayList<>();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxAuthorizationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxAuthorizationTest.java
index a2fc6d8130f..325cb8d84e4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxAuthorizationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxAuthorizationTest.java
@@ -16,16 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.pulsar.broker.service;
 
 import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgsRecordingInvocations;
-import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper;
-import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
 import static org.mockito.ArgumentMatchers.argThat;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.testng.Assert.assertEquals;
@@ -38,8 +36,6 @@ import io.jsonwebtoken.SignatureAlgorithm;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPipeline;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.nio.charset.StandardCharsets;
@@ -47,24 +43,15 @@ import java.util.Base64;
 import java.util.Collections;
 import java.util.Optional;
 import java.util.Properties;
-import java.util.concurrent.CompletableFuture;
 import javax.crypto.SecretKey;
-import org.apache.bookkeeper.common.util.OrderedExecutor;
-import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
-import org.apache.pulsar.broker.PulsarService;
-import org.apache.pulsar.broker.PulsarServiceMockSupport;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.TestPulsarService;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription;
 import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
 import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
-import org.apache.pulsar.broker.intercept.BrokerInterceptor;
-import org.apache.pulsar.broker.resources.NamespaceResources;
-import org.apache.pulsar.broker.resources.PulsarResources;
-import org.apache.pulsar.broker.resources.TenantResources;
-import org.apache.pulsar.broker.service.schema.DefaultSchemaRegistryService;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.auth.AuthenticationToken;
 import org.apache.pulsar.common.api.proto.CommandConnect;
@@ -74,10 +61,8 @@ import org.apache.pulsar.common.api.proto.CommandSubscribe;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TopicOperation;
-import org.apache.pulsar.metadata.api.MetadataStore;
-import org.apache.pulsar.metadata.impl.ZKMetadataStore;
-import org.apache.zookeeper.ZooKeeper;
 import org.mockito.ArgumentMatcher;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
@@ -89,15 +74,14 @@ public class ServerCnxAuthorizationTest {
     private final String CLIENT_TOKEN = 
Jwts.builder().setSubject(CLIENT_PRINCIPAL).signWith(SECRET_KEY).compact();
     private final String PROXY_TOKEN = 
Jwts.builder().setSubject(PROXY_PRINCIPAL).signWith(SECRET_KEY).compact();
 
-    private PulsarService pulsar;
-    private PulsarResources pulsarResources;
-    private BrokerService brokerService;
     private ServiceConfiguration svcConfig;
 
+    protected TestPulsarService.Factory testPulsarServiceFactory;
+    private BrokerService brokerService;
+
     @BeforeMethod(alwaysRun = true)
     public void beforeMethod() throws Exception {
-        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
-        svcConfig = spy(ServiceConfiguration.class);
+        svcConfig = new ServiceConfiguration();
         svcConfig.setKeepAliveIntervalSeconds(0);
         svcConfig.setBrokerShutdownTimeoutMs(0L);
         svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
@@ -111,57 +95,30 @@ public class ServerCnxAuthorizationTest {
         properties.setProperty("tokenSecretKey", "data:;base64,"
                 + Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded()));
         svcConfig.setProperties(properties);
+        testPulsarServiceFactory = TestPulsarService.Factory.builder()
+                .config(svcConfig)
+                .useSpies(true)
+                .build();
+        brokerService = testPulsarServiceFactory.getBrokerService();
+
+        
testPulsarServiceFactory.getPulsarResources().getTenantResources().createTenant("public",
+                TenantInfo.builder().build());
+    }
 
-        pulsar = 
spyWithClassAndConstructorArgsRecordingInvocations(PulsarService.class, 
svcConfig);
-        doReturn(new 
DefaultSchemaRegistryService()).when(pulsar).getSchemaRegistryService();
-
-        doReturn(svcConfig).when(pulsar).getConfiguration();
-        PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
-            
doReturn(mock(PulsarResources.class)).when(pulsar).getPulsarResources();
-        });
-
-
-        ManagedLedgerFactory mlFactoryMock = mock(ManagedLedgerFactory.class);
-        doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory();
-
-        ZooKeeper mockZk = createMockZooKeeper();
-        OrderedExecutor executor = 
OrderedExecutor.newBuilder().numThreads(1).build();
-        doReturn(createMockBookKeeper(executor))
-                .when(pulsar).getBookKeeperClient();
-
-        MetadataStore store = new ZKMetadataStore(mockZk);
-
-        doReturn(store).when(pulsar).getLocalMetadataStore();
-        doReturn(store).when(pulsar).getConfigurationMetadataStore();
-
-        pulsarResources = 
spyWithClassAndConstructorArgsRecordingInvocations(PulsarResources.class, 
store, store);
-        PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
-            doReturn(pulsarResources).when(pulsar).getPulsarResources();
-        });
-        NamespaceResources namespaceResources =
-                
spyWithClassAndConstructorArgsRecordingInvocations(NamespaceResources.class, 
store, 30);
-        
doReturn(namespaceResources).when(pulsarResources).getNamespaceResources();
-
-        TenantResources tenantResources = 
spyWithClassAndConstructorArgsRecordingInvocations(TenantResources.class, 
store, 30);
-        doReturn(tenantResources).when(pulsarResources).getTenantResources();
-
-        
doReturn(CompletableFuture.completedFuture(Optional.of(TenantInfo.builder().build()))).when(tenantResources)
-                .getTenantAsync("public");
-
-        brokerService = 
spyWithClassAndConstructorArgsRecordingInvocations(BrokerService.class, pulsar, 
eventLoopGroup);
-        BrokerInterceptor interceptor = mock(BrokerInterceptor.class);
-        doReturn(interceptor).when(brokerService).getInterceptor();
-        PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
-            doReturn(brokerService).when(pulsar).getBrokerService();
-            doReturn(executor).when(pulsar).getOrderedExecutor();
-        });
+    @AfterMethod(alwaysRun = true)
+    public void cleanup() throws Exception {
+        if (testPulsarServiceFactory != null) {
+            testPulsarServiceFactory.close();
+            testPulsarServiceFactory = null;
+        }
     }
 
     @Test
     public void testVerifyOriginalPrincipalWithAuthDataForwardedFromProxy() 
throws Exception {
-        doReturn(true).when(svcConfig).isAuthenticateOriginalAuthData();
+        svcConfig.setAuthenticateOriginalAuthData(true);
+
 
-        ServerCnx serverCnx = 
spyWithClassAndConstructorArgsRecordingInvocations(ServerCnx.class, pulsar);
+        ServerCnx serverCnx = testPulsarServiceFactory.createServerCnxSpy();
         ChannelHandlerContext channelHandlerContext = 
mock(ChannelHandlerContext.class);
         Channel channel = mock(Channel.class);
         ChannelPipeline channelPipeline = mock(ChannelPipeline.class);
@@ -197,7 +154,8 @@ public class ServerCnxAuthorizationTest {
         assertEquals(serverCnx.getAuthState().getAuthRole(), PROXY_PRINCIPAL);
 
         AuthorizationService authorizationService =
-                
spyWithClassAndConstructorArgsRecordingInvocations(AuthorizationService.class, 
svcConfig, pulsarResources);
+                
spyWithClassAndConstructorArgsRecordingInvocations(AuthorizationService.class, 
svcConfig,
+                        testPulsarServiceFactory.getPulsarResources());
         
doReturn(authorizationService).when(brokerService).getAuthorizationService();
 
         // lookup
@@ -265,9 +223,9 @@ public class ServerCnxAuthorizationTest {
 
     @Test
     public void testVerifyOriginalPrincipalWithoutAuthDataForwardedFromProxy() 
throws Exception {
-        doReturn(false).when(svcConfig).isAuthenticateOriginalAuthData();
+        svcConfig.setAuthenticateOriginalAuthData(false);
 
-        ServerCnx serverCnx = 
spyWithClassAndConstructorArgsRecordingInvocations(ServerCnx.class, pulsar);
+        ServerCnx serverCnx = testPulsarServiceFactory.createServerCnxSpy();
         ChannelHandlerContext channelHandlerContext = 
mock(ChannelHandlerContext.class);
         Channel channel = mock(Channel.class);
         ChannelPipeline channelPipeline = mock(ChannelPipeline.class);
@@ -298,7 +256,8 @@ public class ServerCnxAuthorizationTest {
         assertEquals(serverCnx.getAuthState().getAuthRole(), PROXY_PRINCIPAL);
 
         AuthorizationService authorizationService =
-                
spyWithClassAndConstructorArgsRecordingInvocations(AuthorizationService.class, 
svcConfig, pulsarResources);
+                
spyWithClassAndConstructorArgsRecordingInvocations(AuthorizationService.class, 
svcConfig,
+                        testPulsarServiceFactory.getPulsarResources());
         
doReturn(authorizationService).when(brokerService).getAuthorizationService();
 
         // lookup
@@ -359,7 +318,7 @@ public class ServerCnxAuthorizationTest {
 
     @Test
     public void testVerifyAuthRoleAndAuthDataFromDirectConnectionBroker() 
throws Exception {
-        ServerCnx serverCnx = 
spyWithClassAndConstructorArgsRecordingInvocations(ServerCnx.class, pulsar);
+        ServerCnx serverCnx = testPulsarServiceFactory.createServerCnxSpy();
 
         ChannelHandlerContext channelHandlerContext = 
mock(ChannelHandlerContext.class);
         Channel channel = mock(Channel.class);
@@ -390,7 +349,8 @@ public class ServerCnxAuthorizationTest {
         assertEquals(serverCnx.getAuthState().getAuthRole(), CLIENT_PRINCIPAL);
 
         AuthorizationService authorizationService =
-                
spyWithClassAndConstructorArgsRecordingInvocations(AuthorizationService.class, 
svcConfig, pulsarResources);
+                
spyWithClassAndConstructorArgsRecordingInvocations(AuthorizationService.class, 
svcConfig,
+                        testPulsarServiceFactory.getPulsarResources());
         
doReturn(authorizationService).when(brokerService).getAuthorizationService();
 
         // lookup
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherFailoverConsumerStreamingDispatcherTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherFailoverConsumerStreamingDispatcherTest.java
index abedc1a6688..00e31c51c9c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherFailoverConsumerStreamingDispatcherTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherFailoverConsumerStreamingDispatcherTest.java
@@ -32,6 +32,6 @@ public class 
PersistentDispatcherFailoverConsumerStreamingDispatcherTest extends
     @BeforeMethod(alwaysRun = true)
     public void setup() throws Exception {
         super.setup();
-        pulsar.getConfiguration().setStreamingDispatch(true);
+        testPulsarServiceFactory.getConfig().setStreamingDispatch(true);
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicStreamingDispatcherTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicStreamingDispatcherTest.java
index e0740a234e1..d7699e0b817 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicStreamingDispatcherTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicStreamingDispatcherTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.service.persistent;
 
+import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.PersistentTopicTest;
 import org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher;
 import org.testng.annotations.BeforeMethod;
@@ -32,8 +33,9 @@ public class PersistentTopicStreamingDispatcherTest extends 
PersistentTopicTest
     @BeforeMethod(alwaysRun = true)
     public void setup() throws Exception {
         super.setup();
-        pulsar.getConfiguration().setTopicLevelPoliciesEnabled(false);
-        pulsar.getConfiguration().setSystemTopicEnabled(false);
-        pulsar.getConfiguration().setStreamingDispatch(true);
+        ServiceConfiguration config = testPulsarServiceFactory.getConfig();
+        config.setTopicLevelPoliciesEnabled(false);
+        config.setSystemTopicEnabled(false);
+        config.setStreamingDispatch(true);
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregatorTest.java
index 9bd0b1f1bd4..e63f644f3d0 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregatorTest.java
@@ -25,7 +25,6 @@ import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl;
 import org.apache.bookkeeper.mledger.util.StatsBuckets;
 import org.apache.pulsar.broker.PulsarService;
-import org.apache.pulsar.broker.PulsarServiceMockSupport;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.Consumer;
@@ -60,10 +59,8 @@ public class NamespaceStatsAggregatorTest {
         doReturn(multiLayerTopicsMap).when(broker).getMultiLayerTopicMap();
         
Mockito.when(pulsar.getLocalMetadataStore()).thenReturn(Mockito.mock(ZKMetadataStore.class));
         ServiceConfiguration mockConfig = 
Mockito.mock(ServiceConfiguration.class);
-        PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
-            doReturn(mockConfig).when(pulsar).getConfiguration();
-            doReturn(broker).when(pulsar).getBrokerService();
-        });
+        doReturn(mockConfig).when(pulsar).getConfiguration();
+        doReturn(broker).when(pulsar).getBrokerService();
     }
 
     @Test

Reply via email to