This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d9a71d7ff4e [improve][test] Refactor TestPulsarService to 
PulsarTestContext and add support for starting (#19337)
d9a71d7ff4e is described below

commit d9a71d7ff4e9b251ed51d3e3154b992dc58f6c74
Author: Lari Hotari <[email protected]>
AuthorDate: Mon Jan 30 20:51:39 2023 +0200

    [improve][test] Refactor TestPulsarService to PulsarTestContext and add 
support for starting (#19337)
---
 .../apache/pulsar/broker/TestPulsarService.java    | 501 ---------------------
 .../OwnerShipForCurrentServerTestBase.java         | 180 +-------
 .../broker/service/MessageCumulativeAckTest.java   |  16 +-
 .../PersistentDispatcherFailoverConsumerTest.java  |  38 +-
 .../pulsar/broker/service/PersistentTopicTest.java | 208 +++++----
 .../broker/service/ServerCnxAuthorizationTest.java |  30 +-
 .../pulsar/broker/service/ServerCnxTest.java       |  46 +-
 ...herFailoverConsumerStreamingDispatcherTest.java |   2 +-
 .../PersistentTopicStreamingDispatcherTest.java    |   2 +-
 .../testcontext/AbstractTestPulsarService.java     |  88 ++++
 .../testcontext/MockBookKeeperClientFactory.java   |  60 +++
 .../NonClosableMockBookKeeper.java}                |  35 +-
 .../broker/testcontext/NonClosingProxyHandler.java |  73 +++
 .../testcontext/NonStartableTestPulsarService.java | 196 ++++++++
 .../broker/testcontext/PulsarTestContext.java      | 411 +++++++++++++++++
 .../pulsar/broker/testcontext/SpyConfig.java       |  83 ++++
 .../testcontext/StartableTestPulsarService.java    |  37 ++
 17 files changed, 1180 insertions(+), 826 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/TestPulsarService.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/TestPulsarService.java
deleted file mode 100644
index 95667f95a33..00000000000
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/TestPulsarService.java
+++ /dev/null
@@ -1,501 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.pulsar.broker;
-
-import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
-import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgsRecordingInvocations;
-import static org.mockito.Mockito.mock;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import java.io.IOException;
-import java.time.Duration;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.function.Function;
-import java.util.function.Supplier;
-import lombok.AccessLevel;
-import lombok.Builder;
-import lombok.Getter;
-import lombok.Singular;
-import lombok.ToString;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.PulsarMockBookKeeper;
-import org.apache.bookkeeper.common.util.OrderedExecutor;
-import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
-import org.apache.bookkeeper.stats.NullStatsProvider;
-import org.apache.bookkeeper.stats.StatsProvider;
-import org.apache.pulsar.broker.namespace.NamespaceService;
-import org.apache.pulsar.broker.resources.NamespaceResources;
-import org.apache.pulsar.broker.resources.PulsarResources;
-import org.apache.pulsar.broker.resources.TopicResources;
-import org.apache.pulsar.broker.service.BrokerService;
-import org.apache.pulsar.broker.service.PulsarMetadataEventSynchronizer;
-import org.apache.pulsar.broker.service.ServerCnx;
-import org.apache.pulsar.broker.service.schema.DefaultSchemaRegistryService;
-import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
-import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.util.GracefulExecutorServicesShutdown;
-import org.apache.pulsar.compaction.Compactor;
-import org.apache.pulsar.metadata.api.MetadataStore;
-import org.apache.pulsar.metadata.api.MetadataStoreConfig;
-import org.apache.pulsar.metadata.api.MetadataStoreException;
-import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
-import org.apache.pulsar.metadata.impl.MetadataStoreFactoryImpl;
-import org.jetbrains.annotations.NotNull;
-
-/**
- * Subclass of PulsarService that is used for some tests.
- * This was written as a replacement for the previous Mockito Spy over 
PulsarService solution which caused
- * a flaky test issue https://github.com/apache/pulsar/issues/13620.
- */
-
-public class TestPulsarService extends PulsarService {
-
-
-    @Slf4j
-    @ToString
-    @Getter
-    @Builder
-    public static class Factory implements AutoCloseable {
-        private final ServiceConfiguration config;
-        private final MetadataStoreExtended localMetadataStore;
-        private final MetadataStoreExtended configurationMetadataStore;
-        private final PulsarResources pulsarResources;
-
-        private final OrderedExecutor executor;
-
-        private final EventLoopGroup eventLoopGroup;
-
-        private final ManagedLedgerStorage managedLedgerClientFactory;
-
-        private final boolean useSpies;
-
-        private final PulsarService pulsarService;
-
-        private final Compactor compactor;
-
-        private final BrokerService brokerService;
-
-        @Getter(AccessLevel.NONE)
-        @Singular
-        private final List<AutoCloseable> cleanupFunctions;
-
-        public ManagedLedgerFactory getManagedLedgerFactory() {
-            return managedLedgerClientFactory.getManagedLedgerFactory();
-        }
-
-        public static FactoryBuilder builder() {
-            return new CustomFactoryBuilder();
-        }
-
-        public void close() throws Exception {
-            pulsarService.getBrokerService().close();
-            pulsarService.close();
-            GracefulExecutorServicesShutdown.initiate()
-                    .timeout(Duration.ZERO)
-                    .shutdown(executor)
-                    .handle().get();
-            eventLoopGroup.shutdownGracefully().get();
-            if (localMetadataStore != configurationMetadataStore) {
-                localMetadataStore.close();
-                configurationMetadataStore.close();
-            } else {
-                localMetadataStore.close();
-            }
-            for (AutoCloseable cleanup : cleanupFunctions) {
-                try {
-                    cleanup.close();
-                } catch (Exception e) {
-                    log.error("Failure in calling cleanup function", e);
-                }
-            }
-        }
-
-        public ServerCnx createServerCnxSpy() {
-            return 
spyWithClassAndConstructorArgsRecordingInvocations(ServerCnx.class,
-                    getPulsarService());
-        }
-
-        public static class FactoryBuilder {
-            protected boolean useTestPulsarResources = false;
-            protected MetadataStore pulsarResourcesMetadataStore;
-            protected Function<PulsarService, BrokerService> 
brokerServiceFunction;
-
-            public FactoryBuilder useTestPulsarResources() {
-                useTestPulsarResources = true;
-                return this;
-            }
-
-            public FactoryBuilder useTestPulsarResources(MetadataStore 
metadataStore) {
-                useTestPulsarResources = true;
-                pulsarResourcesMetadataStore = metadataStore;
-                return this;
-            }
-
-            public FactoryBuilder managedLedgerClients(BookKeeper 
bookKeeperClient,
-                                                       ManagedLedgerFactory 
managedLedgerFactory) {
-                return managedLedgerClientFactory(
-                        
Factory.createManagedLedgerClientFactory(bookKeeperClient, 
managedLedgerFactory));
-            }
-
-            public FactoryBuilder brokerServiceFunction(
-                    Function<PulsarService, BrokerService> 
brokerServiceFunction) {
-                this.brokerServiceFunction = brokerServiceFunction;
-                return this;
-            }
-        }
-
-        private static class CustomFactoryBuilder extends 
Factory.FactoryBuilder {
-
-            @Override
-            public Factory build() {
-                initializeDefaults();
-                return super.build();
-            }
-
-            private void initializeDefaults() {
-                try {
-                    if (super.managedLedgerClientFactory == null) {
-                        if (super.executor == null) {
-                            super.executor = 
OrderedExecutor.newBuilder().numThreads(1)
-                                    
.name(TestPulsarService.class.getSimpleName() + "-executor").build();
-                        }
-                        NonClosableMockBookKeeper mockBookKeeper;
-                        if (super.useSpies) {
-                            mockBookKeeper =
-                                    
spyWithClassAndConstructorArgs(NonClosableMockBookKeeper.class, super.executor);
-                        } else {
-                            mockBookKeeper = new 
NonClosableMockBookKeeper(super.executor);
-                        }
-                        cleanupFunction(() -> mockBookKeeper.reallyShutdown());
-                        ManagedLedgerFactory mlFactoryMock = 
mock(ManagedLedgerFactory.class);
-
-                        managedLedgerClientFactory(
-                                
Factory.createManagedLedgerClientFactory(mockBookKeeper, mlFactoryMock));
-                    }
-                } catch (Exception e) {
-                    throw new RuntimeException(e);
-                }
-                if (super.config == null) {
-                    ServiceConfiguration svcConfig = new 
ServiceConfiguration();
-                    svcConfig.setBrokerShutdownTimeoutMs(0L);
-                    
svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
-                    svcConfig.setClusterName("pulsar-cluster");
-                    config(svcConfig);
-                }
-                if (super.localMetadataStore == null || 
super.configurationMetadataStore == null) {
-                    try {
-                        MetadataStoreExtended store = 
MetadataStoreFactoryImpl.createExtended("memory:local",
-                                MetadataStoreConfig.builder().build());
-                        if (super.localMetadataStore == null) {
-                            localMetadataStore(store);
-                        }
-                        if (super.configurationMetadataStore == null) {
-                            configurationMetadataStore(store);
-                        }
-                    } catch (MetadataStoreException e) {
-                        throw new RuntimeException(e);
-                    }
-                }
-                if (super.pulsarResources == null) {
-                    if (useTestPulsarResources) {
-                        MetadataStore metadataStore = 
pulsarResourcesMetadataStore;
-                        if (metadataStore == null) {
-                            metadataStore = super.configurationMetadataStore;
-                        }
-                        NamespaceResources nsr =
-                                
spyWithClassAndConstructorArgs(NamespaceResources.class, metadataStore, 30);
-                        TopicResources tsr = 
spyWithClassAndConstructorArgs(TopicResources.class, metadataStore);
-                        if (!super.useSpies) {
-                            pulsarResources(
-                                    new 
TestPulsarResources(super.localMetadataStore, super.configurationMetadataStore,
-                                            tsr, nsr));
-                        } else {
-                            pulsarResources(
-                                    
spyWithClassAndConstructorArgs(TestPulsarResources.class, 
super.localMetadataStore,
-                                            super.configurationMetadataStore, 
tsr, nsr));
-                        }
-                    } else {
-                        if (!super.useSpies) {
-                            pulsarResources(
-                                    new 
PulsarResources(super.localMetadataStore, super.configurationMetadataStore));
-                        } else {
-                            pulsarResources(
-                                    
spyWithClassAndConstructorArgs(PulsarResources.class, super.localMetadataStore,
-                                            super.configurationMetadataStore));
-                        }
-                    }
-                }
-                if (super.brokerServiceFunction == null) {
-                    if (super.brokerService == null) {
-                        if (super.eventLoopGroup == null) {
-                            super.eventLoopGroup = new NioEventLoopGroup();
-                        }
-                        brokerServiceFunction(pulsarService -> {
-                            try {
-                                if (!super.useSpies) {
-                                    return new 
TestBrokerService(pulsarService, super.eventLoopGroup);
-                                } else {
-                                    return 
spyWithClassAndConstructorArgs(TestBrokerService.class, pulsarService,
-                                            super.eventLoopGroup);
-                                }
-                            } catch (Exception e) {
-                                throw new RuntimeException(e);
-                            }
-                        });
-                    } else {
-                        brokerServiceFunction(pulsarService -> 
super.brokerService);
-                    }
-                }
-                if (!super.useSpies) {
-                    pulsarService(new TestPulsarService(super.config, 
super.localMetadataStore,
-                            super.configurationMetadataStore, 
super.pulsarResources, super.managedLedgerClientFactory,
-                            super.brokerServiceFunction, super.executor, 
super.compactor));
-                } else {
-                    
pulsarService(spyWithClassAndConstructorArgs(TestPulsarService.class, 
super.config,
-                            super.localMetadataStore, 
super.configurationMetadataStore, super.pulsarResources,
-                            super.managedLedgerClientFactory, 
super.brokerServiceFunction, super.executor,
-                            super.compactor));
-                }
-                if (super.brokerService == null) {
-                    brokerService(super.pulsarService.getBrokerService());
-                }
-            }
-        }
-
-        @NotNull
-        private static ManagedLedgerStorage 
createManagedLedgerClientFactory(BookKeeper bookKeeperClient,
-                                                                             
ManagedLedgerFactory managedLedgerFactory) {
-            return new ManagedLedgerStorage() {
-
-                @Override
-                public void initialize(ServiceConfiguration conf, 
MetadataStoreExtended metadataStore,
-                                       BookKeeperClientFactory 
bookkeeperProvider, EventLoopGroup eventLoopGroup)
-                        throws Exception {
-
-                }
-
-                @Override
-                public ManagedLedgerFactory getManagedLedgerFactory() {
-                    return managedLedgerFactory;
-                }
-
-                @Override
-                public StatsProvider getStatsProvider() {
-                    return new NullStatsProvider();
-                }
-
-                @Override
-                public BookKeeper getBookKeeperClient() {
-                    return bookKeeperClient;
-                }
-
-                @Override
-                public void close() throws IOException {
-
-                }
-            };
-        }
-    }
-
-    private static class TestPulsarResources extends PulsarResources {
-
-        private final TopicResources topicResources;
-        private final NamespaceResources namespaceResources;
-
-        public TestPulsarResources(MetadataStore localMetadataStore, 
MetadataStore configurationMetadataStore,
-                                   TopicResources topicResources, 
NamespaceResources namespaceResources) {
-            super(localMetadataStore, configurationMetadataStore);
-            this.topicResources = topicResources;
-            this.namespaceResources = namespaceResources;
-        }
-
-        @Override
-        public TopicResources getTopicResources() {
-            return topicResources;
-        }
-
-        @Override
-        public NamespaceResources getNamespaceResources() {
-            return namespaceResources;
-        }
-    }
-
-    private static class TestBrokerService extends BrokerService {
-
-        TestBrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) 
throws Exception {
-            super(pulsar, eventLoopGroup);
-        }
-
-        @Override
-        protected CompletableFuture<Map<String, String>> 
fetchTopicPropertiesAsync(TopicName topicName) {
-            return CompletableFuture.completedFuture(Collections.emptyMap());
-        }
-    }
-
-    // Prevent the MockBookKeeper instance from being closed when the broker 
is restarted within a test
-    private static class NonClosableMockBookKeeper extends 
PulsarMockBookKeeper {
-
-        public NonClosableMockBookKeeper(OrderedExecutor executor) throws 
Exception {
-            super(executor);
-        }
-
-        @Override
-        public void close() {
-            // no-op
-        }
-
-        @Override
-        public void shutdown() {
-            // no-op
-        }
-
-        public void reallyShutdown() {
-            super.shutdown();
-        }
-    }
-
-    private final MetadataStoreExtended localMetadataStore;
-    private final MetadataStoreExtended configurationMetadataStore;
-    private final PulsarResources pulsarResources;
-    private final ManagedLedgerStorage managedLedgerClientFactory;
-    private final BrokerService brokerService;
-
-    private final OrderedExecutor executor;
-
-    private final Compactor compactor;
-
-    private final SchemaRegistryService schemaRegistryService;
-
-    private final PulsarClient pulsarClient;
-
-    private final NamespaceService namespaceService;
-
-    protected TestPulsarService(ServiceConfiguration config, 
MetadataStoreExtended localMetadataStore,
-                                MetadataStoreExtended 
configurationMetadataStore, PulsarResources pulsarResources,
-                                ManagedLedgerStorage 
managedLedgerClientFactory,
-                                Function<PulsarService, BrokerService> 
brokerServiceFunction, OrderedExecutor executor,
-                                Compactor compactor) {
-        super(config);
-        this.localMetadataStore = localMetadataStore;
-        this.configurationMetadataStore = configurationMetadataStore;
-        this.pulsarResources = pulsarResources;
-        this.managedLedgerClientFactory = managedLedgerClientFactory;
-        this.brokerService = brokerServiceFunction.apply(this);
-        this.executor = executor;
-        this.compactor = compactor;
-        this.schemaRegistryService = 
spyWithClassAndConstructorArgs(DefaultSchemaRegistryService.class);
-        this.pulsarClient = mock(PulsarClientImpl.class);
-        this.namespaceService = mock(NamespaceService.class);
-        try {
-            startNamespaceService();
-        } catch (PulsarServerException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-
-    @Override
-    public Supplier<NamespaceService> getNamespaceServiceProvider() throws 
PulsarServerException {
-        return () -> namespaceService;
-    }
-
-    @Override
-    public synchronized PulsarClient getClient() throws PulsarServerException {
-        return pulsarClient;
-    }
-
-    @Override
-    public SchemaRegistryService getSchemaRegistryService() {
-        return schemaRegistryService;
-    }
-
-    @Override
-    public MetadataStore 
createConfigurationMetadataStore(PulsarMetadataEventSynchronizer synchronizer)
-            throws MetadataStoreException {
-        return configurationMetadataStore;
-    }
-
-    @Override
-    public MetadataStoreExtended 
createLocalMetadataStore(PulsarMetadataEventSynchronizer synchronizer)
-            throws MetadataStoreException, PulsarServerException {
-        return localMetadataStore;
-    }
-
-    @Override
-    public PulsarResources getPulsarResources() {
-        return pulsarResources;
-    }
-
-    public BrokerService getBrokerService() {
-        return brokerService;
-    }
-
-    @Override
-    public Compactor getCompactor() throws PulsarServerException {
-        if (compactor != null) {
-            return compactor;
-        } else {
-            return super.getCompactor();
-        }
-    }
-
-    @Override
-    public MetadataStore getConfigurationMetadataStore() {
-        return configurationMetadataStore;
-    }
-
-    @Override
-    public MetadataStoreExtended getLocalMetadataStore() {
-        return localMetadataStore;
-    }
-
-    @Override
-    public ManagedLedgerStorage getManagedLedgerClientFactory() {
-        return managedLedgerClientFactory;
-    }
-
-    @Override
-    public OrderedExecutor getOrderedExecutor() {
-        return executor;
-    }
-
-    @Override
-    protected PulsarResources newPulsarResources() {
-        return pulsarResources;
-    }
-
-    @Override
-    protected ManagedLedgerStorage newManagedLedgerClientFactory() throws 
Exception {
-        return managedLedgerClientFactory;
-    }
-
-    @Override
-    protected BrokerService newBrokerService(PulsarService pulsar) throws 
Exception {
-        return brokerService;
-    }
-}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java
index e4331d0b486..d5ea10da0f4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java
@@ -16,54 +16,31 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.pulsar.broker.namespace;
 
-import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
-import com.google.common.util.concurrent.MoreExecutors;
-import io.netty.channel.EventLoopGroup;
-import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
-import org.apache.bookkeeper.client.PulsarMockBookKeeper;
-import org.apache.bookkeeper.common.util.OrderedExecutor;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.util.ZkUtils;
-import org.apache.pulsar.broker.BookKeeperClientFactory;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.auth.SameThreadOrderedSafeExecutor;
-import org.apache.pulsar.broker.intercept.CounterBrokerInterceptor;
+import org.apache.pulsar.broker.testcontext.PulsarTestContext;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.common.policies.data.TopicType;
-import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
-import org.apache.pulsar.metadata.impl.ZKMetadataStore;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.MockZooKeeper;
-import org.apache.zookeeper.MockZooKeeperSession;
-import org.apache.zookeeper.data.ACL;
 
 @Slf4j
-public class OwnerShipForCurrentServerTestBase {
+public abstract class OwnerShipForCurrentServerTestBase {
 
     public static final String CLUSTER_NAME = "test";
 
     @Setter
     private int brokerCount = 3;
 
-    private final List<SameThreadOrderedSafeExecutor> orderedExecutorList = 
new ArrayList<>();
     @Getter
     private final List<ServiceConfiguration> serviceConfigurationList = new 
ArrayList<>();
     @Getter
@@ -72,9 +49,7 @@ public class OwnerShipForCurrentServerTestBase {
     protected PulsarAdmin admin;
     protected PulsarClient pulsarClient;
 
-    private MockZooKeeper mockZooKeeper;
-    private OrderedExecutor bkExecutor;
-    private NonClosableMockBookKeeper mockBookKeeper;
+    protected List<PulsarTestContext> pulsarTestContexts = new ArrayList<>();
 
     public void internalSetup() throws Exception {
         init();
@@ -85,13 +60,6 @@ public class OwnerShipForCurrentServerTestBase {
     }
 
     private void init() throws Exception {
-        mockZooKeeper = createMockZooKeeper();
-
-        bkExecutor = OrderedExecutor.newBuilder()
-                .numThreads(1)
-                .name("mock-pulsar-bk")
-                .build();
-        mockBookKeeper = createMockBookKeeper(bkExecutor);
         startBroker();
     }
 
@@ -118,96 +86,22 @@ public class OwnerShipForCurrentServerTestBase {
             conf.setWebServicePortTls(Optional.of(0));
             serviceConfigurationList.add(conf);
 
-            PulsarService pulsar = 
spyWithClassAndConstructorArgs(PulsarService.class, conf);
-
-            setupBrokerMocks(pulsar);
-            pulsar.start();
+            PulsarTestContext.Builder testContextBuilder =
+                    PulsarTestContext.startableBuilder()
+                            .config(conf);
+            if (i > 0) {
+                
testContextBuilder.reuseMockBookkeeperAndMetadataStores(pulsarTestContexts.get(0));
+            } else {
+                testContextBuilder.withMockZookeeper();
+            }
+            PulsarTestContext pulsarTestContext = testContextBuilder
+                    .build();
+            PulsarService pulsar = pulsarTestContext.getPulsarService();
             pulsarServiceList.add(pulsar);
+            pulsarTestContexts.add(pulsarTestContext);
         }
     }
 
-    protected void setupBrokerMocks(PulsarService pulsar) throws Exception {
-        // Override default providers with mocked ones
-        
doReturn(mockBookKeeperClientFactory).when(pulsar).newBookKeeperClientFactory();
-        MockZooKeeperSession mockZooKeeperSession = 
MockZooKeeperSession.newInstance(mockZooKeeper);
-        doReturn(new 
ZKMetadataStore(mockZooKeeperSession)).when(pulsar).createLocalMetadataStore(null);
-        doReturn(new 
ZKMetadataStore(mockZooKeeperSession)).when(pulsar).createConfigurationMetadataStore(null);
-        Supplier<NamespaceService> namespaceServiceSupplier = () -> 
spyWithClassAndConstructorArgs(
-                NamespaceService.class, pulsar);
-        
doReturn(namespaceServiceSupplier).when(pulsar).getNamespaceServiceProvider();
-
-        SameThreadOrderedSafeExecutor executor = new 
SameThreadOrderedSafeExecutor();
-        orderedExecutorList.add(executor);
-        doReturn(executor).when(pulsar).getOrderedExecutor();
-        doReturn(new 
CounterBrokerInterceptor()).when(pulsar).getBrokerInterceptor();
-
-        doAnswer((invocation) -> 
spy(invocation.callRealMethod())).when(pulsar).newCompactor();
-    }
-
-    public static MockZooKeeper createMockZooKeeper() throws Exception {
-        MockZooKeeper zk = 
MockZooKeeper.newInstance(MoreExecutors.newDirectExecutorService());
-        List<ACL> dummyAclList = new ArrayList<>(0);
-
-        ZkUtils.createFullPathOptimistic(zk, "/ledgers/available/192.168.1.1:" 
+ 5000,
-                "".getBytes(StandardCharsets.UTF_8), dummyAclList, 
CreateMode.PERSISTENT);
-
-        zk.create("/ledgers/LAYOUT", 
"1\nflat:1".getBytes(StandardCharsets.UTF_8), dummyAclList,
-                CreateMode.PERSISTENT);
-        return zk;
-    }
-
-    public static NonClosableMockBookKeeper 
createMockBookKeeper(OrderedExecutor executor) throws Exception {
-        return spyWithClassAndConstructorArgs(NonClosableMockBookKeeper.class, 
executor);
-    }
-
-    // Prevent the MockBookKeeper instance from being closed when the broker 
is restarted within a test
-    public static class NonClosableMockBookKeeper extends PulsarMockBookKeeper 
{
-
-        public NonClosableMockBookKeeper(OrderedExecutor executor) throws 
Exception {
-            super(executor);
-        }
-
-        @Override
-        public void close() {
-            // no-op
-        }
-
-        @Override
-        public void shutdown() {
-            // no-op
-        }
-
-        public void reallyShutdown() {
-            super.shutdown();
-        }
-    }
-
-    private final BookKeeperClientFactory mockBookKeeperClientFactory = new 
BookKeeperClientFactory() {
-
-        @Override
-        public BookKeeper create(ServiceConfiguration conf, 
MetadataStoreExtended store,
-                                 EventLoopGroup eventLoopGroup,
-                                 Optional<Class<? extends 
EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
-                                 Map<String, Object> properties) {
-            // Always return the same instance (so that we don't loose the 
mock BK content on broker restart
-            return mockBookKeeper;
-        }
-
-        @Override
-        public BookKeeper create(ServiceConfiguration conf, 
MetadataStoreExtended store,
-                                 EventLoopGroup eventLoopGroup,
-                                 Optional<Class<? extends 
EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
-                                 Map<String, Object> properties, StatsLogger 
statsLogger) {
-            // Always return the same instance (so that we don't loose the 
mock BK content on broker restart
-            return mockBookKeeper;
-        }
-
-        @Override
-        public void close() {
-            // no-op
-        }
-    };
-
     protected final void internalCleanup() {
         try {
             // if init fails, some of these could be null, and if so would 
throw
@@ -220,49 +114,19 @@ public class OwnerShipForCurrentServerTestBase {
                 pulsarClient.shutdown();
                 pulsarClient = null;
             }
-            if (pulsarServiceList.size() > 0) {
-                for (PulsarService pulsarService : pulsarServiceList) {
-                    pulsarService.close();
+            if (pulsarTestContexts.size() > 0) {
+                for(int i = pulsarTestContexts.size() - 1; i >= 0; i--) {
+                    pulsarTestContexts.get(i).close();
                 }
-                pulsarServiceList.clear();
+                pulsarTestContexts.clear();
             }
+            pulsarServiceList.clear();
             if (serviceConfigurationList.size() > 0) {
                 serviceConfigurationList.clear();
             }
-            if (mockBookKeeper != null) {
-                mockBookKeeper.reallyShutdown();
-            }
-            if (mockZooKeeper != null) {
-                mockZooKeeper.shutdown();
-            }
-            if (orderedExecutorList.size() > 0) {
-                for (int i = 0; i < orderedExecutorList.size(); i++) {
-                    SameThreadOrderedSafeExecutor 
sameThreadOrderedSafeExecutor = orderedExecutorList.get(i);
-                    if(sameThreadOrderedSafeExecutor != null) {
-                        try {
-                            sameThreadOrderedSafeExecutor.shutdownNow();
-                            sameThreadOrderedSafeExecutor.awaitTermination(5, 
TimeUnit.SECONDS);
-                        } catch (InterruptedException ex) {
-                            log.error("sameThreadOrderedSafeExecutor shutdown 
had error", ex);
-                            Thread.currentThread().interrupt();
-                        }
-                        orderedExecutorList.set(i, null);
-                    }
-                }
-            }
-            if(bkExecutor != null) {
-                try {
-                    bkExecutor.shutdownNow();
-                    bkExecutor.awaitTermination(5, TimeUnit.SECONDS);
-                } catch (InterruptedException ex) {
-                    log.error("bkExecutor shutdown had error", ex);
-                    Thread.currentThread().interrupt();
-                }
-                bkExecutor = null;
-            }
+
         } catch (Exception e) {
             log.warn("Failed to clean up mocked pulsar service:", e);
         }
     }
-
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
index 703376b2546..d01fa1fa540 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
@@ -38,7 +38,7 @@ import io.netty.channel.ChannelHandlerContext;
 import java.net.InetSocketAddress;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
-import org.apache.pulsar.broker.TestPulsarService;
+import org.apache.pulsar.broker.testcontext.PulsarTestContext;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.MessageId;
@@ -57,14 +57,14 @@ public class MessageCumulativeAckTest {
 
     private ServerCnx serverCnx;
     private PersistentSubscription sub;
-    private TestPulsarService.Factory testPulsarServiceFactory;
+    private PulsarTestContext pulsarTestContext;
 
     @BeforeMethod
     public void setup() throws Exception {
-        testPulsarServiceFactory = TestPulsarService.Factory.builder()
+        pulsarTestContext = PulsarTestContext.builder()
                 .build();
 
-        serverCnx = testPulsarServiceFactory.createServerCnxSpy();
+        serverCnx = pulsarTestContext.createServerCnxSpy();
         doReturn(true).when(serverCnx).isActive();
         doReturn(true).when(serverCnx).isWritable();
         doReturn(new InetSocketAddress("localhost", 
1234)).when(serverCnx).clientAddress();
@@ -74,7 +74,7 @@ public class MessageCumulativeAckTest {
                 .when(serverCnx).getCommandSender();
 
         String topicName = 
TopicName.get("MessageCumulativeAckTest").toString();
-        PersistentTopic persistentTopic = new PersistentTopic(topicName, 
mock(ManagedLedger.class), testPulsarServiceFactory.getBrokerService());
+        PersistentTopic persistentTopic = new PersistentTopic(topicName, 
mock(ManagedLedger.class), pulsarTestContext.getBrokerService());
         sub = spy(new PersistentSubscription(persistentTopic, "sub-1",
             mock(ManagedCursorImpl.class), false));
         doNothing().when(sub).acknowledgeMessage(any(), any(), any());
@@ -82,9 +82,9 @@ public class MessageCumulativeAckTest {
 
     @AfterMethod(alwaysRun = true)
     public void shutdown() throws Exception {
-        if (testPulsarServiceFactory != null) {
-            testPulsarServiceFactory.close();
-            testPulsarServiceFactory = null;
+        if (pulsarTestContext != null) {
+            pulsarTestContext.close();
+            pulsarTestContext = null;
         }
         sub = null;
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index 4dfaade33ca..d9a9dbcbcf2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -60,7 +60,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.TestPulsarService;
+import org.apache.pulsar.broker.testcontext.PulsarTestContext;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
 import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
@@ -91,7 +91,7 @@ public class PersistentDispatcherFailoverConsumerTest {
     private ChannelHandlerContext channelCtx;
     private LinkedBlockingQueue<CommandActiveConsumerChange> consumerChanges;
 
-    protected TestPulsarService.Factory testPulsarServiceFactory;
+    protected PulsarTestContext pulsarTestContext;
 
     final String successTopicName = 
"persistent://part-perf/global/perf.t1/ptopic";
     final String failTopicName = 
"persistent://part-perf/global/perf.t1/pfailTopic";
@@ -104,9 +104,9 @@ public class PersistentDispatcherFailoverConsumerTest {
         svcConfig.setClusterName("pulsar-cluster");
         svcConfig.setSystemTopicEnabled(false);
         svcConfig.setTopicLevelPoliciesEnabled(false);
-        testPulsarServiceFactory = TestPulsarService.Factory.builder()
+        pulsarTestContext = PulsarTestContext.builder()
                 .config(svcConfig)
-                .useSpies(true)
+                .spyByDefault()
                 .build();
 
         consumerChanges = new LinkedBlockingQueue<>();
@@ -132,7 +132,7 @@ public class PersistentDispatcherFailoverConsumerTest {
             return null;
         }).when(channelCtx).writeAndFlush(any(), any());
 
-        serverCnx = testPulsarServiceFactory.createServerCnxSpy();
+        serverCnx = pulsarTestContext.createServerCnxSpy();
         doReturn(true).when(serverCnx).isActive();
         doReturn(true).when(serverCnx).isWritable();
         doReturn(new InetSocketAddress("localhost", 
1234)).when(serverCnx).clientAddress();
@@ -141,7 +141,7 @@ public class PersistentDispatcherFailoverConsumerTest {
         doReturn(new PulsarCommandSenderImpl(null, serverCnx))
                 .when(serverCnx).getCommandSender();
 
-        serverCnxWithOldVersion = 
testPulsarServiceFactory.createServerCnxSpy();
+        serverCnxWithOldVersion = pulsarTestContext.createServerCnxSpy();
         doReturn(true).when(serverCnxWithOldVersion).isActive();
         doReturn(true).when(serverCnxWithOldVersion).isWritable();
         doReturn(new InetSocketAddress("localhost", 1234))
@@ -153,7 +153,7 @@ public class PersistentDispatcherFailoverConsumerTest {
                 .when(serverCnxWithOldVersion).getCommandSender();
 
         NamespaceService nsSvc = mock(NamespaceService.class);
-        
doReturn(nsSvc).when(testPulsarServiceFactory.getPulsarService()).getNamespaceService();
+        
doReturn(nsSvc).when(pulsarTestContext.getPulsarService()).getNamespaceService();
         
doReturn(true).when(nsSvc).isServiceUnitOwned(any(NamespaceBundle.class));
         doReturn(true).when(nsSvc).isServiceUnitActive(any(TopicName.class));
         
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkTopicOwnership(any(TopicName.class));
@@ -164,9 +164,9 @@ public class PersistentDispatcherFailoverConsumerTest {
 
     @AfterMethod(alwaysRun = true)
     public void shutdown() throws Exception {
-        if (testPulsarServiceFactory != null) {
-            testPulsarServiceFactory.close();
-            testPulsarServiceFactory = null;
+        if (pulsarTestContext != null) {
+            pulsarTestContext.close();
+            pulsarTestContext = null;
         }
     }
 
@@ -181,7 +181,7 @@ public class PersistentDispatcherFailoverConsumerTest {
         doAnswer(invocationOnMock -> {
             ((OpenLedgerCallback) 
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
             return null;
-        }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+        }).when(pulsarTestContext.getManagedLedgerFactory())
                 .asyncOpen(matches(".*success.*"), 
any(ManagedLedgerConfig.class),
                         any(OpenLedgerCallback.class), any(Supplier.class), 
any());
 
@@ -190,7 +190,7 @@ public class PersistentDispatcherFailoverConsumerTest {
             ((OpenLedgerCallback) invocationOnMock.getArguments()[2])
                     .openLedgerFailed(new ManagedLedgerException("Managed 
ledger failure"), null);
             return null;
-        }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+        }).when(pulsarTestContext.getManagedLedgerFactory())
                 .asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class),
                         any(OpenLedgerCallback.class), any(Supplier.class), 
any());
 
@@ -231,7 +231,7 @@ public class PersistentDispatcherFailoverConsumerTest {
     @Test
     public void testConsumerGroupChangesWithOldNewConsumers() throws Exception 
{
         PersistentTopic topic =
-                new PersistentTopic(successTopicName, ledgerMock, 
testPulsarServiceFactory.getBrokerService());
+                new PersistentTopic(successTopicName, ledgerMock, 
pulsarTestContext.getBrokerService());
         PersistentSubscription sub = new PersistentSubscription(topic, 
"sub-1", cursorMock, false);
 
         int partitionIndex = 0;
@@ -272,7 +272,7 @@ public class PersistentDispatcherFailoverConsumerTest {
     public void testAddRemoveConsumer() throws Exception {
         log.info("--- Starting 
PersistentDispatcherFailoverConsumerTest::testAddConsumer ---");
 
-        PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, testPulsarServiceFactory.getBrokerService());
+        PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, pulsarTestContext.getBrokerService());
         PersistentSubscription sub = new PersistentSubscription(topic, 
"sub-1", cursorMock, false);
 
         int partitionIndex = 4;
@@ -404,7 +404,7 @@ public class PersistentDispatcherFailoverConsumerTest {
         log.info("--- Starting 
PersistentDispatcherFailoverConsumerTest::testAddConsumer ---");
 
         PersistentTopic topic =
-                new PersistentTopic(successTopicName, ledgerMock, 
testPulsarServiceFactory.getBrokerService());
+                new PersistentTopic(successTopicName, ledgerMock, 
pulsarTestContext.getBrokerService());
         PersistentSubscription sub = new PersistentSubscription(topic, 
"sub-1", cursorMock, false);
 
         // Non partitioned topic.
@@ -465,7 +465,7 @@ public class PersistentDispatcherFailoverConsumerTest {
     public void 
testMultipleDispatcherGetNextConsumerWithDifferentPriorityLevel() throws 
Exception {
 
         PersistentTopic topic =
-                new PersistentTopic(successTopicName, ledgerMock, 
testPulsarServiceFactory.getBrokerService());
+                new PersistentTopic(successTopicName, ledgerMock, 
pulsarTestContext.getBrokerService());
         PersistentDispatcherMultipleConsumers dispatcher = new 
PersistentDispatcherMultipleConsumers(topic, cursorMock, null);
         Consumer consumer1 = createConsumer(topic, 0, 2, false, 1);
         Consumer consumer2 = createConsumer(topic, 0, 2, false, 2);
@@ -510,7 +510,7 @@ public class PersistentDispatcherFailoverConsumerTest {
     @Test
     public void testFewBlockedConsumerSamePriority() throws Exception{
         PersistentTopic topic =
-                new PersistentTopic(successTopicName, ledgerMock, 
testPulsarServiceFactory.getBrokerService());
+                new PersistentTopic(successTopicName, ledgerMock, 
pulsarTestContext.getBrokerService());
         PersistentDispatcherMultipleConsumers dispatcher = new 
PersistentDispatcherMultipleConsumers(topic, cursorMock, null);
         Consumer consumer1 = createConsumer(topic, 0, 2, false, 1);
         Consumer consumer2 = createConsumer(topic, 0, 2, false, 2);
@@ -538,7 +538,7 @@ public class PersistentDispatcherFailoverConsumerTest {
     @Test
     public void testFewBlockedConsumerDifferentPriority() throws Exception {
         PersistentTopic topic =
-                new PersistentTopic(successTopicName, ledgerMock, 
testPulsarServiceFactory.getBrokerService());
+                new PersistentTopic(successTopicName, ledgerMock, 
pulsarTestContext.getBrokerService());
         PersistentDispatcherMultipleConsumers dispatcher = new 
PersistentDispatcherMultipleConsumers(topic, cursorMock, null);
         Consumer consumer1 = createConsumer(topic, 0, 2, false, 1);
         Consumer consumer2 = createConsumer(topic, 0, 2, false, 2);
@@ -593,7 +593,7 @@ public class PersistentDispatcherFailoverConsumerTest {
     @Test
     public void testFewBlockedConsumerDifferentPriority2() throws Exception {
         PersistentTopic topic =
-                new PersistentTopic(successTopicName, ledgerMock, 
testPulsarServiceFactory.getBrokerService());
+                new PersistentTopic(successTopicName, ledgerMock, 
pulsarTestContext.getBrokerService());
         PersistentDispatcherMultipleConsumers dispatcher = new 
PersistentDispatcherMultipleConsumers(topic, cursorMock, null);
         Consumer consumer1 = createConsumer(topic, 0, 2, true, 1);
         Consumer consumer2 = createConsumer(topic, 0, 2, true, 2);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index ae9fb9a1bf7..0bf60679663 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.pulsar.broker.service;
 
 import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgsRecordingInvocations;
@@ -91,7 +92,6 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.TestPulsarService;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.persistent.CompactorSubscription;
 import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator;
@@ -100,10 +100,12 @@ import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleAct
 import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.testcontext.PulsarTestContext;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.impl.ProducerBuilderImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
@@ -155,7 +157,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
     final String successSubName2 = "successSub2";
     private static final Logger log = 
LoggerFactory.getLogger(PersistentTopicTest.class);
 
-    protected TestPulsarService.Factory testPulsarServiceFactory;
+    protected PulsarTestContext pulsarTestContext;
 
     private BrokerService brokerService;
 
@@ -170,24 +172,24 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         svcConfig.setClusterName("pulsar-cluster");
         svcConfig.setTopicLevelPoliciesEnabled(false);
         svcConfig.setSystemTopicEnabled(false);
-        testPulsarServiceFactory = TestPulsarService.Factory.builder()
+        pulsarTestContext = PulsarTestContext.builder()
                 .config(svcConfig)
-                .useSpies(true)
+                .spyByDefault()
                 .useTestPulsarResources(metadataStore)
                 .compactor(mock(Compactor.class))
                 .build();
-        brokerService = testPulsarServiceFactory.getBrokerService();
+        brokerService = pulsarTestContext.getBrokerService();
 
         doAnswer(invocationOnMock -> CompletableFuture.completedFuture(null))
-                
.when(testPulsarServiceFactory.getManagedLedgerFactory()).getManagedLedgerPropertiesAsync(any());
+                
.when(pulsarTestContext.getManagedLedgerFactory()).getManagedLedgerPropertiesAsync(any());
         doAnswer(invocation -> {
             DeleteLedgerCallback deleteLedgerCallback = 
invocation.getArgument(1);
             deleteLedgerCallback.deleteLedgerComplete(null);
             return null;
-        
}).when(testPulsarServiceFactory.getManagedLedgerFactory()).asyncDelete(any(), 
any(), any());
+        
}).when(pulsarTestContext.getManagedLedgerFactory()).asyncDelete(any(), any(), 
any());
         // Mock serviceCnx.
         serverCnx = 
spyWithClassAndConstructorArgsRecordingInvocations(ServerCnx.class,
-                testPulsarServiceFactory.getPulsarService());
+                pulsarTestContext.getPulsarService());
         doReturn(true).when(serverCnx).isActive();
         doReturn(true).when(serverCnx).isWritable();
         doReturn(new InetSocketAddress("localhost", 
1234)).when(serverCnx).clientAddress();
@@ -202,7 +204,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         NamespaceService nsSvc = mock(NamespaceService.class);
         NamespaceBundle bundle = mock(NamespaceBundle.class);
         
doReturn(CompletableFuture.completedFuture(bundle)).when(nsSvc).getBundleAsync(any());
-        
doReturn(nsSvc).when(testPulsarServiceFactory.getPulsarService()).getNamespaceService();
+        
doReturn(nsSvc).when(pulsarTestContext.getPulsarService()).getNamespaceService();
         doReturn(true).when(nsSvc).isServiceUnitOwned(any());
         doReturn(true).when(nsSvc).isServiceUnitActive(any());
         
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).isServiceUnitActiveAsync(any());
@@ -213,9 +215,9 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
 
     @AfterMethod(alwaysRun = true)
     public void teardown() throws Exception {
-        if (testPulsarServiceFactory != null) {
-            testPulsarServiceFactory.close();
-            testPulsarServiceFactory = null;
+        if (pulsarTestContext != null) {
+            pulsarTestContext.close();
+            pulsarTestContext = null;
         }
     }
 
@@ -228,7 +230,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         doAnswer(invocationOnMock -> {
             ((OpenLedgerCallback) 
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
             return null;
-        }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+        }).when(pulsarTestContext.getManagedLedgerFactory())
                 .asyncOpen(anyString(), any(ManagedLedgerConfig.class), 
any(OpenLedgerCallback.class),
                         any(Supplier.class), any());
 
@@ -254,7 +256,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
                     .openLedgerFailed(new ManagedLedgerException("Managed 
ledger failure"), null)).start();
 
             return null;
-        }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+        }).when(pulsarTestContext.getManagedLedgerFactory())
                 .asyncOpen(anyString(), any(ManagedLedgerConfig.class), 
any(OpenLedgerCallback.class),
                         any(Supplier.class), any());
 
@@ -315,19 +317,24 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
 
     @Test
     public void testDispatcherMultiConsumerReadFailed() {
-        PersistentTopic topic = 
spyWithClassAndConstructorArgsRecordingInvocations(PersistentTopic.class, 
successTopicName, ledgerMock, brokerService);
+        PersistentTopic topic =
+                
spyWithClassAndConstructorArgsRecordingInvocations(PersistentTopic.class, 
successTopicName, ledgerMock,
+                        brokerService);
         ManagedCursor cursor = mock(ManagedCursor.class);
         when(cursor.getName()).thenReturn("cursor");
         Subscription subscription = mock(Subscription.class);
         when(subscription.getName()).thenReturn("sub");
-        PersistentDispatcherMultipleConsumers dispatcher = new 
PersistentDispatcherMultipleConsumers(topic, cursor, subscription);
+        PersistentDispatcherMultipleConsumers dispatcher =
+                new PersistentDispatcherMultipleConsumers(topic, cursor, 
subscription);
         dispatcher.readEntriesFailed(new 
ManagedLedgerException.InvalidCursorPositionException("failed"), null);
         verify(topic, atLeast(1)).getBrokerService();
     }
 
     @Test
     public void testDispatcherSingleConsumerReadFailed() {
-        PersistentTopic topic = 
spyWithClassAndConstructorArgsRecordingInvocations(PersistentTopic.class, 
successTopicName, ledgerMock, brokerService);
+        PersistentTopic topic =
+                
spyWithClassAndConstructorArgsRecordingInvocations(PersistentTopic.class, 
successTopicName, ledgerMock,
+                        brokerService);
         ManagedCursor cursor = mock(ManagedCursor.class);
         when(cursor.getName()).thenReturn("cursor");
         PersistentDispatcherSingleActiveConsumer dispatcher = new 
PersistentDispatcherSingleActiveConsumer(cursor,
@@ -517,7 +524,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
     @Test
     public void testMaxProducersForBroker() {
         // set max clients
-        testPulsarServiceFactory.getConfig().setMaxProducersPerTopic(2);
+        pulsarTestContext.getConfig().setMaxProducersPerTopic(2);
         testMaxProducers();
     }
 
@@ -526,9 +533,9 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         // set max clients
         Policies policies = new Policies();
         policies.max_producers_per_topic = 2;
-        testPulsarServiceFactory.getPulsarResources().getNamespaceResources()
-                        
.createPolicies(TopicName.get(successTopicName).getNamespaceObject(),
-                                policies);
+        pulsarTestContext.getPulsarResources().getNamespaceResources()
+                
.createPolicies(TopicName.get(successTopicName).getNamespaceObject(),
+                        policies);
         testMaxProducers();
     }
 
@@ -536,7 +543,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         final String producerNameBase = "producer";
         final String role = "appid1";
 
-        ServerCnx cnx = testPulsarServiceFactory.createServerCnxSpy();
+        ServerCnx cnx = pulsarTestContext.createServerCnxSpy();
         doReturn(true).when(cnx).isActive();
         doReturn(true).when(cnx).isWritable();
         doReturn(new InetSocketAddress(address, 
1234)).when(cnx).clientAddress();
@@ -550,7 +557,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
     @Test
     public void testMaxSameAddressProducers() throws Exception {
         // set max clients
-        
testPulsarServiceFactory.getConfig().setMaxSameAddressProducersPerTopic(2);
+        pulsarTestContext.getConfig().setMaxSameAddressProducersPerTopic(2);
 
         PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
 
@@ -694,7 +701,8 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
 
         Consumer consumer = new Consumer(sub, SubType.Exclusive, 
topic.getName(), 1, 0, "Cons1", true, serverCnx,
                 "myrole-1", Collections.emptyMap(), false,
-                new 
KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT), MessageId.latest, 
DEFAULT_CONSUMER_EPOCH);
+                new 
KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT), MessageId.latest,
+                DEFAULT_CONSUMER_EPOCH);
         sub.addConsumer(consumer);
         consumer.close();
 
@@ -705,7 +713,8 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
 
             consumer = new Consumer(sub, subType, topic.getName(), 1, 0, 
"Cons1", true, serverCnx, "myrole-1",
                     Collections.emptyMap(), false,
-                    new 
KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT), MessageId.latest, 
DEFAULT_CONSUMER_EPOCH);
+                    new 
KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT), MessageId.latest,
+                    DEFAULT_CONSUMER_EPOCH);
             sub.addConsumer(consumer);
 
             assertTrue(sub.getDispatcher().isConsumerConnected());
@@ -727,8 +736,10 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         PersistentSubscription sub = new PersistentSubscription(topic, 
"sub-1", cursorMock, false);
 
         // 1. simple add consumer
-        Consumer consumer = new Consumer(sub, SubType.Exclusive, 
topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */,
-                true, serverCnx, "myrole-1", Collections.emptyMap(), false /* 
read compacted */, null, MessageId.latest, DEFAULT_CONSUMER_EPOCH);
+        Consumer consumer = new Consumer(sub, SubType.Exclusive, 
topic.getName(), 1 /* consumer id */, 0,
+                "Cons1"/* consumer name */,
+                true, serverCnx, "myrole-1", Collections.emptyMap(), false /* 
read compacted */, null, MessageId.latest,
+                DEFAULT_CONSUMER_EPOCH);
         sub.addConsumer(consumer);
         assertTrue(sub.getDispatcher().isConsumerConnected());
 
@@ -853,8 +864,8 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
     @Test
     public void testMaxConsumersSharedForBroker() throws Exception {
         // set max clients
-        testPulsarServiceFactory.getConfig().setMaxConsumersPerSubscription(2);
-        testPulsarServiceFactory.getConfig().setMaxConsumersPerTopic(3);
+        pulsarTestContext.getConfig().setMaxConsumersPerSubscription(2);
+        pulsarTestContext.getConfig().setMaxConsumersPerTopic(3);
         testMaxConsumersShared();
     }
 
@@ -865,7 +876,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         policies.max_consumers_per_subscription = 2;
         policies.max_consumers_per_topic = 3;
 
-        testPulsarServiceFactory.getPulsarResources().getNamespaceResources()
+        pulsarTestContext.getPulsarResources().getNamespaceResources()
                 
.createPolicies(TopicName.get(successTopicName).getNamespaceObject(),
                         policies);
 
@@ -949,8 +960,8 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
     @Test
     public void testMaxConsumersFailoverForBroker() throws Exception {
         // set max clients
-        testPulsarServiceFactory.getConfig().setMaxConsumersPerSubscription(2);
-        testPulsarServiceFactory.getConfig().setMaxConsumersPerTopic(3);
+        pulsarTestContext.getConfig().setMaxConsumersPerSubscription(2);
+        pulsarTestContext.getConfig().setMaxConsumersPerTopic(3);
 
         testMaxConsumersFailover();
     }
@@ -962,7 +973,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         policies.max_consumers_per_subscription = 2;
         policies.max_consumers_per_topic = 3;
 
-        testPulsarServiceFactory.getPulsarResources().getNamespaceResources()
+        pulsarTestContext.getPulsarResources().getNamespaceResources()
                 
.createPolicies(TopicName.get(successTopicName).getNamespaceObject(),
                         policies);
 
@@ -974,7 +985,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         final String consumerNameBase = "consumer";
         final String role = "appid1";
 
-        ServerCnx cnx = testPulsarServiceFactory.createServerCnxSpy();
+        ServerCnx cnx = pulsarTestContext.createServerCnxSpy();
         doReturn(true).when(cnx).isActive();
         doReturn(true).when(cnx).isWritable();
         doReturn(new InetSocketAddress(address, 
1234)).when(cnx).clientAddress();
@@ -988,7 +999,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
     @Test
     public void testMaxSameAddressConsumers() throws Exception {
         // set max clients
-        
testPulsarServiceFactory.getConfig().setMaxSameAddressConsumersPerTopic(2);
+        pulsarTestContext.getConfig().setMaxSameAddressConsumersPerTopic(2);
 
         PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
         PersistentSubscription sub1 = new PersistentSubscription(topic, 
"sub1", cursorMock, false);
@@ -1091,8 +1102,10 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
     public void testUbsubscribeRaceConditions() throws Exception {
         PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
         PersistentSubscription sub = new PersistentSubscription(topic, 
"sub-1", cursorMock, false);
-        Consumer consumer1 = new Consumer(sub, SubType.Exclusive, 
topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */,
-                true, serverCnx, "myrole-1", Collections.emptyMap(), false /* 
read compacted */, null, MessageId.latest, DEFAULT_CONSUMER_EPOCH);
+        Consumer consumer1 = new Consumer(sub, SubType.Exclusive, 
topic.getName(), 1 /* consumer id */, 0,
+                "Cons1"/* consumer name */,
+                true, serverCnx, "myrole-1", Collections.emptyMap(), false /* 
read compacted */, null, MessageId.latest,
+                DEFAULT_CONSUMER_EPOCH);
         sub.addConsumer(consumer1);
 
         doAnswer(invocationOnMock -> {
@@ -1113,7 +1126,8 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
             Thread.sleep(10); /* delay to ensure that the ubsubscribe gets 
executed first */
             sub.addConsumer(new Consumer(sub, SubType.Exclusive, 
topic.getName(), 2 /* consumer id */,
                     0, "Cons2"/* consumer name */, true, serverCnx,
-                    "myrole-1", Collections.emptyMap(), false /* read 
compacted */, null, MessageId.latest, DEFAULT_CONSUMER_EPOCH)).get();
+                    "myrole-1", Collections.emptyMap(), false /* read 
compacted */, null, MessageId.latest,
+                    DEFAULT_CONSUMER_EPOCH)).get();
             fail();
         } catch (Exception e) {
             assertTrue(e.getCause() instanceof 
BrokerServiceException.SubscriptionFencedException);
@@ -1350,7 +1364,8 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
             String role = "appid1";
             Thread.sleep(10); /* delay to ensure that the delete gets executed 
first */
             Producer producer = new Producer(topic, serverCnx, 1 /* producer 
id */, "prod-name",
-                    role, false, null, SchemaVersion.Latest, 0, false, 
ProducerAccessMode.Shared, Optional.empty(), true);
+                    role, false, null, SchemaVersion.Latest, 0, false, 
ProducerAccessMode.Shared, Optional.empty(),
+                    true);
             topic.addProducer(producer, new CompletableFuture<>()).join();
             fail("Should have failed");
         } catch (Exception e) {
@@ -1412,7 +1427,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         doAnswer(invocationOnMock -> {
             ((OpenLedgerCallback) 
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
             return null;
-        }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+        }).when(pulsarTestContext.getManagedLedgerFactory())
                 .asyncOpen(matches(".*success.*"), 
any(ManagedLedgerConfig.class),
                         any(OpenLedgerCallback.class), any(Supplier.class), 
any());
 
@@ -1421,7 +1436,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
             ((OpenLedgerCallback) invocationOnMock.getArguments()[2])
                     .openLedgerFailed(new ManagedLedgerException("Managed 
ledger failure"), null);
             return null;
-        }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+        }).when(pulsarTestContext.getManagedLedgerFactory())
                 .asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class),
                         any(OpenLedgerCallback.class), any(Supplier.class), 
any());
 
@@ -1437,7 +1452,9 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         doAnswer(invocationOnMock -> {
             ((OpenCursorCallback) 
invocationOnMock.getArguments()[2]).openCursorComplete(cursorMock, null);
             return null;
-        }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), 
any(InitialPosition.class), any(OpenCursorCallback.class), any());
+        }).when(ledgerMock)
+                .asyncOpenCursor(matches(".*success.*"), 
any(InitialPosition.class), any(OpenCursorCallback.class),
+                        any());
 
         doAnswer(invocationOnMock -> {
             ((OpenCursorCallback) 
invocationOnMock.getArguments()[4]).openCursorComplete(cursorMock, null);
@@ -1462,10 +1479,10 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         }).when(ledgerMock).asyncDeleteCursor(matches(".*success.*"), 
any(DeleteCursorCallback.class), any());
 
         doAnswer((invokactionOnMock) -> {
-                ((MarkDeleteCallback) invokactionOnMock.getArguments()[2])
+            ((MarkDeleteCallback) invokactionOnMock.getArguments()[2])
                     .markDeleteComplete(invokactionOnMock.getArguments()[3]);
-                return null;
-            }).when(cursorMock).asyncMarkDelete(any(), any(), 
any(MarkDeleteCallback.class), any());
+            return null;
+        }).when(cursorMock).asyncMarkDelete(any(), any(), 
any(MarkDeleteCallback.class), any());
     }
 
 
@@ -1683,20 +1700,26 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         String remoteReplicatorName = topic.getReplicatorPrefix() + "." + 
remoteCluster;
         ConcurrentOpenHashMap<String, Replicator> replicatorMap = 
topic.getReplicators();
 
-        PulsarService pulsar = testPulsarServiceFactory.getPulsarService();
-        final URL brokerUrl = new URL(
-                "http://"; + pulsar.getAdvertisedAddress() + ":" + 
pulsar.getConfiguration().getBrokerServicePort()
-                        .get());
-        @Cleanup
-        PulsarClient client = 
PulsarClient.builder().serviceUrl(brokerUrl.toString()).build();
         ManagedCursor cursor = mock(ManagedCursorImpl.class);
         doReturn(remoteCluster).when(cursor).getName();
-        brokerService.getReplicationClients().put(remoteCluster, client);
+        PulsarClientImpl pulsarClientMock = mock(PulsarClientImpl.class);
+        when(pulsarClientMock.newProducer(any())).thenAnswer(
+                invocation -> {
+                    ProducerBuilderImpl producerBuilder =
+                            new ProducerBuilderImpl(pulsarClientMock, 
invocation.getArgument(0)) {
+                                @Override
+                                public 
CompletableFuture<org.apache.pulsar.client.api.Producer> createAsync() {
+                                    return 
CompletableFuture.completedFuture(mock(org.apache.pulsar.client.api.Producer.class));
+                                }
+                            };
+                    return producerBuilder;
+                });
+        brokerService.getReplicationClients().put(remoteCluster, 
pulsarClientMock);
         PersistentReplicator replicator = spy(
                 new GeoPersistentReplicator(topic, cursor, localCluster, 
remoteCluster, brokerService,
                         (PulsarClientImpl) 
brokerService.getReplicationClient(remoteCluster,
                                 
brokerService.pulsar().getPulsarResources().getClusterResources()
-                                .getCluster(remoteCluster))));
+                                        .getCluster(remoteCluster))));
         replicatorMap.put(remoteReplicatorName, replicator);
 
         // step-1 remove replicator : it will disconnect the producer but it 
will wait for callback to be completed
@@ -1730,7 +1753,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
 
         PersistentTopic topic = new PersistentTopic(globalTopicName, 
ledgerMock, brokerService);
 
-        PulsarService pulsar = testPulsarServiceFactory.getPulsarService();
+        PulsarService pulsar = pulsarTestContext.getPulsarService();
         final URL brokerUrl = new URL(
                 "http://"; + pulsar.getAdvertisedAddress() + ":" + 
pulsar.getConfiguration().getBrokerServicePort()
                         .get());
@@ -1738,7 +1761,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         PulsarClient client = 
spy(PulsarClient.builder().serviceUrl(brokerUrl.toString()).build());
         PulsarClientImpl clientImpl = (PulsarClientImpl) client;
         doReturn(new CompletableFuture<Producer>()).when(clientImpl)
-            .createProducerAsync(any(ProducerConfigurationData.class), 
any(Schema.class));
+                .createProducerAsync(any(ProducerConfigurationData.class), 
any(Schema.class));
 
         ManagedCursor cursor = mock(ManagedCursorImpl.class);
         doReturn(remoteCluster).when(cursor).getName();
@@ -1750,10 +1773,10 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
 
         // PersistentReplicator constructor calls startProducer()
         verify(clientImpl)
-            .createProducerAsync(
-                any(ProducerConfigurationData.class),
-                any(), eq(null)
-            );
+                .createProducerAsync(
+                        any(ProducerConfigurationData.class),
+                        any(), eq(null)
+                );
 
         replicator.disconnect(false);
         replicator.disconnect(false);
@@ -1770,8 +1793,8 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         when(compactedTopic.newCompactedLedger(any(Position.class), anyLong()))
                 
.thenReturn(CompletableFuture.completedFuture(mock(CompactedTopicContext.class)));
         PersistentSubscription sub = new CompactorSubscription(topic, 
compactedTopic,
-                                                               
Compactor.COMPACTION_SUBSCRIPTION,
-                                                               cursorMock);
+                Compactor.COMPACTION_SUBSCRIPTION,
+                cursorMock);
         PositionImpl position = new PositionImpl(1, 1);
         long ledgerId = 0xc0bfefeL;
         sub.acknowledgeMessage(Collections.singletonList(position), 
AckType.Cumulative,
@@ -1800,13 +1823,13 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
     @Test
     public void testCompactionTriggeredAfterThresholdFirstInvocation() throws 
Exception {
         CompletableFuture<Long> compactPromise = new CompletableFuture<>();
-        Compactor compactor = 
testPulsarServiceFactory.getPulsarService().getCompactor();
+        Compactor compactor = 
pulsarTestContext.getPulsarService().getCompactor();
         doReturn(compactPromise).when(compactor).compact(anyString());
 
         Policies policies = new Policies();
         policies.compaction_threshold = 1L;
 
-        testPulsarServiceFactory.getPulsarResources().getNamespaceResources()
+        pulsarTestContext.getPulsarResources().getNamespaceResources()
                 
.createPolicies(TopicName.get(successTopicName).getNamespaceObject(),
                         policies);
 
@@ -1831,7 +1854,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
     @Test
     public void testCompactionTriggeredAfterThresholdSecondInvocation() throws 
Exception {
         CompletableFuture<Long> compactPromise = new CompletableFuture<>();
-        Compactor compactor = 
testPulsarServiceFactory.getPulsarService().getCompactor();
+        Compactor compactor = 
pulsarTestContext.getPulsarService().getCompactor();
         doReturn(compactPromise).when(compactor).compact(anyString());
 
         ManagedCursor subCursor = mock(ManagedCursor.class);
@@ -1841,7 +1864,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         Policies policies = new Policies();
         policies.compaction_threshold = 1L;
 
-        testPulsarServiceFactory.getPulsarResources().getNamespaceResources()
+        pulsarTestContext.getPulsarResources().getNamespaceResources()
                 
.createPolicies(TopicName.get(successTopicName).getNamespaceObject(),
                         policies);
 
@@ -1865,13 +1888,13 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
     @Test
     public void testCompactionDisabledWithZeroThreshold() throws Exception {
         CompletableFuture<Long> compactPromise = new CompletableFuture<>();
-        Compactor compactor = 
testPulsarServiceFactory.getPulsarService().getCompactor();
+        Compactor compactor = 
pulsarTestContext.getPulsarService().getCompactor();
         doReturn(compactPromise).when(compactor).compact(anyString());
 
         Policies policies = new Policies();
         policies.compaction_threshold = 0L;
 
-        testPulsarServiceFactory.getPulsarResources().getNamespaceResources()
+        pulsarTestContext.getPulsarResources().getNamespaceResources()
                 
.createPolicies(TopicName.get(successTopicName).getNamespaceObject(),
                         policies);
 
@@ -1886,7 +1909,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
     @Test
     public void testBacklogCursor() throws Exception {
         int backloggedThreshold = 10;
-        
testPulsarServiceFactory.getConfig().setManagedLedgerCursorBackloggedThreshold(backloggedThreshold);
+        
pulsarTestContext.getConfig().setManagedLedgerCursorBackloggedThreshold(backloggedThreshold);
 
         ManagedLedgerImpl ledger = (ManagedLedgerImpl) 
factory.open("cache_backlog_ledger");
         PersistentTopic topic = new PersistentTopic(successTopicName, ledger, 
brokerService);
@@ -1895,22 +1918,28 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         // Open cursor1, add it into activeCursor-container and add it into 
subscription consumer list
         ManagedCursor cursor1 = ledger.openCursor("c1");
         PersistentSubscription sub1 = new PersistentSubscription(topic, 
"sub-1", cursor1, false);
-        Consumer consumer1 = new Consumer(sub1, SubType.Exclusive, 
topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */,
-            true, serverCnx, "myrole-1", Collections.emptyMap(), false /* read 
compacted */, null, MessageId.latest, DEFAULT_CONSUMER_EPOCH);
+        Consumer consumer1 = new Consumer(sub1, SubType.Exclusive, 
topic.getName(), 1 /* consumer id */, 0,
+                "Cons1"/* consumer name */,
+                true, serverCnx, "myrole-1", Collections.emptyMap(), false /* 
read compacted */, null, MessageId.latest,
+                DEFAULT_CONSUMER_EPOCH);
         topic.getSubscriptions().put(Codec.decode(cursor1.getName()), sub1);
         sub1.addConsumer(consumer1);
         // Open cursor2, add it into activeCursor-container and add it into 
subscription consumer list
         ManagedCursor cursor2 = ledger.openCursor("c2");
         PersistentSubscription sub2 = new PersistentSubscription(topic, 
"sub-2", cursor2, false);
-        Consumer consumer2 = new Consumer(sub2, SubType.Exclusive, 
topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */,
-            true, serverCnx, "myrole-2", Collections.emptyMap(), false /* read 
compacted */, null, MessageId.latest, DEFAULT_CONSUMER_EPOCH);
+        Consumer consumer2 = new Consumer(sub2, SubType.Exclusive, 
topic.getName(), 2 /* consumer id */, 0,
+                "Cons2"/* consumer name */,
+                true, serverCnx, "myrole-2", Collections.emptyMap(), false /* 
read compacted */, null, MessageId.latest,
+                DEFAULT_CONSUMER_EPOCH);
         topic.getSubscriptions().put(Codec.decode(cursor2.getName()), sub2);
         sub2.addConsumer(consumer2);
         // Open cursor3, add it into activeCursor-container and do not add it 
into subscription consumer list
         ManagedCursor cursor3 = ledger.openCursor("c3");
         PersistentSubscription sub3 = new PersistentSubscription(topic, 
"sub-3", cursor3, false);
-        Consumer consumer3 = new Consumer(sub2, SubType.Exclusive, 
topic.getName(), 3 /* consumer id */, 0, "Cons2"/* consumer name */,
-            true, serverCnx, "myrole-3", Collections.emptyMap(), false /* read 
compacted */, null, MessageId.latest, DEFAULT_CONSUMER_EPOCH);
+        Consumer consumer3 = new Consumer(sub2, SubType.Exclusive, 
topic.getName(), 3 /* consumer id */, 0,
+                "Cons2"/* consumer name */,
+                true, serverCnx, "myrole-3", Collections.emptyMap(), false /* 
read compacted */, null, MessageId.latest,
+                DEFAULT_CONSUMER_EPOCH);
         topic.getSubscriptions().put(Codec.decode(cursor3.getName()), sub3);
 
         // Case1: cursors are active as haven't started 
deactivateBacklogCursor scan
@@ -2006,13 +2035,19 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
                         .concurrencyLevel(1)
                         .build();
         // This subscription is connected by consumer.
-        PersistentSubscription nonDeletableSubscription1 = 
spyWithClassAndConstructorArgsRecordingInvocations(PersistentSubscription.class,
 topic, "nonDeletableSubscription1", cursorMock, false);
+        PersistentSubscription nonDeletableSubscription1 =
+                
spyWithClassAndConstructorArgsRecordingInvocations(PersistentSubscription.class,
 topic,
+                        "nonDeletableSubscription1", cursorMock, false);
         subscriptions.put(nonDeletableSubscription1.getName(), 
nonDeletableSubscription1);
         // This subscription is not connected by consumer.
-        PersistentSubscription deletableSubscription1 = 
spyWithClassAndConstructorArgsRecordingInvocations(PersistentSubscription.class,
 topic, "deletableSubscription1", cursorMock, false);
+        PersistentSubscription deletableSubscription1 =
+                
spyWithClassAndConstructorArgsRecordingInvocations(PersistentSubscription.class,
 topic,
+                        "deletableSubscription1", cursorMock, false);
         subscriptions.put(deletableSubscription1.getName(), 
deletableSubscription1);
         // This subscription is replicated.
-        PersistentSubscription nonDeletableSubscription2 = 
spyWithClassAndConstructorArgsRecordingInvocations(PersistentSubscription.class,
 topic, "nonDeletableSubscription2", cursorMock, true);
+        PersistentSubscription nonDeletableSubscription2 =
+                
spyWithClassAndConstructorArgsRecordingInvocations(PersistentSubscription.class,
 topic,
+                        "nonDeletableSubscription2", cursorMock, true);
         subscriptions.put(nonDeletableSubscription2.getName(), 
nonDeletableSubscription2);
 
         Field field = topic.getClass().getDeclaredField("subscriptions");
@@ -2027,11 +2062,11 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
                 true, serverCnx, "app1", Collections.emptyMap(), false, null, 
MessageId.latest, DEFAULT_CONSUMER_EPOCH);
         addConsumerToSubscription.invoke(topic, nonDeletableSubscription1, 
consumer);
 
-        testPulsarServiceFactory.getPulsarResources().getNamespaceResources()
+        pulsarTestContext.getPulsarResources().getNamespaceResources()
                 
.createPolicies(TopicName.get(successTopicName).getNamespaceObject(),
                         new Policies());
 
-        
testPulsarServiceFactory.getConfig().setSubscriptionExpirationTimeMinutes(5);
+        pulsarTestContext.getConfig().setSubscriptionExpirationTimeMinutes(5);
 
         doReturn(System.currentTimeMillis() - 
TimeUnit.MINUTES.toMillis(6)).when(cursorMock).getLastActive();
 
@@ -2058,7 +2093,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         Field isClosingOrDeletingField = 
PersistentTopic.class.getDeclaredField("isClosingOrDeleting");
         isClosingOrDeletingField.setAccessible(true);
 
-        testPulsarServiceFactory.getConfig().setTopicFencingTimeoutSeconds(10);
+        pulsarTestContext.getConfig().setTopicFencingTimeoutSeconds(10);
         fence.invoke(topic);
         unfence.invoke(topic);
         ScheduledFuture<?> fencedTopicMonitoringTask = (ScheduledFuture<?>) 
fencedTopicMonitoringTaskField.get(topic);
@@ -2067,7 +2102,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         assertFalse((boolean) isFencedField.get(topic));
         assertFalse((boolean) isClosingOrDeletingField.get(topic));
 
-        testPulsarServiceFactory.getConfig().setTopicFencingTimeoutSeconds(1);
+        pulsarTestContext.getConfig().setTopicFencingTimeoutSeconds(1);
         fence.invoke(topic);
         Thread.sleep(2000);
         fencedTopicMonitoringTask = (ScheduledFuture<?>) 
fencedTopicMonitoringTaskField.get(topic);
@@ -2079,7 +2114,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
 
     @Test
     public void testTopicCloseFencingTimeout() throws Exception {
-        testPulsarServiceFactory.getConfig().setTopicFencingTimeoutSeconds(10);
+        pulsarTestContext.getConfig().setTopicFencingTimeoutSeconds(10);
         Method fence = PersistentTopic.class.getDeclaredMethod("fence");
         fence.setAccessible(true);
         Field fencedTopicMonitoringTaskField = 
PersistentTopic.class.getDeclaredField("fencedTopicMonitoringTask");
@@ -2106,7 +2141,8 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         Position mockPosition = mock(Position.class);
         doReturn("test").when(mockCursor).getName();
         doAnswer((Answer<Object>) invocationOnMock -> {
-            ((AsyncCallbacks.FindEntryCallback) 
invocationOnMock.getArguments()[2]).findEntryComplete(mockPosition, 
invocationOnMock.getArguments()[3]);
+            ((AsyncCallbacks.FindEntryCallback) 
invocationOnMock.getArguments()[2]).findEntryComplete(mockPosition,
+                    invocationOnMock.getArguments()[3]);
             return null;
         }).when(mockCursor).asyncFindNewestMatching(any(), any(), any(), 
any());
         doAnswer((Answer<Object>) invocationOnMock -> {
@@ -2184,7 +2220,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         assertEquals(stats2.keySharedMode, "AUTO_SPLIT");
         assertTrue(stats2.allowOutOfOrderDelivery);
 
-        KeySharedMeta ksm =  new 
KeySharedMeta().setKeySharedMode(KeySharedMode.STICKY)
+        KeySharedMeta ksm = new 
KeySharedMeta().setKeySharedMode(KeySharedMode.STICKY)
                 .setAllowOutOfOrderDelivery(false);
         ksm.addHashRange().setStart(0).setEnd(65535);
         Consumer consumer3 = new Consumer(sub3, SubType.Key_Shared, 
topic.getName(), 3, 0, "Cons3", true, serverCnx,
@@ -2222,7 +2258,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         Set<String> namespaceClusters = new HashSet<>();
         namespaceClusters.add("namespace-cluster");
         policies.replication_clusters = namespaceClusters;
-        testPulsarServiceFactory.getPulsarResources().getNamespaceResources()
+        pulsarTestContext.getPulsarResources().getNamespaceResources()
                 
.createPolicies(TopicName.get(successTopicName).getNamespaceObject(), policies);
 
         topic = new PersistentTopic(successTopicName, ledgerMock, 
brokerService);
@@ -2230,7 +2266,7 @@ public class PersistentTopicTest extends 
MockedBookKeeperTestCase {
         
assertEquals(topic.getHierarchyTopicPolicies().getReplicationClusters().get(), 
namespaceClusters);
 
         TopicPoliciesService topicPoliciesService = 
mock(TopicPoliciesService.class);
-        
doReturn(topicPoliciesService).when(testPulsarServiceFactory.getPulsarService()).getTopicPoliciesService();
+        
doReturn(topicPoliciesService).when(pulsarTestContext.getPulsarService()).getTopicPoliciesService();
         CompletableFuture<Optional<TopicPolicies>> topicPoliciesFuture = new 
CompletableFuture<>();
         TopicPolicies topicPolicies = new TopicPolicies();
         List<String> topicClusters = new ArrayList<>();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxAuthorizationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxAuthorizationTest.java
index 325cb8d84e4..c1e32c0886e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxAuthorizationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxAuthorizationTest.java
@@ -45,7 +45,7 @@ import java.util.Optional;
 import java.util.Properties;
 import javax.crypto.SecretKey;
 import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.TestPulsarService;
+import org.apache.pulsar.broker.testcontext.PulsarTestContext;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription;
 import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
@@ -76,7 +76,7 @@ public class ServerCnxAuthorizationTest {
 
     private ServiceConfiguration svcConfig;
 
-    protected TestPulsarService.Factory testPulsarServiceFactory;
+    protected PulsarTestContext pulsarTestContext;
     private BrokerService brokerService;
 
     @BeforeMethod(alwaysRun = true)
@@ -95,21 +95,21 @@ public class ServerCnxAuthorizationTest {
         properties.setProperty("tokenSecretKey", "data:;base64,"
                 + Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded()));
         svcConfig.setProperties(properties);
-        testPulsarServiceFactory = TestPulsarService.Factory.builder()
+        pulsarTestContext = PulsarTestContext.builder()
                 .config(svcConfig)
-                .useSpies(true)
+                .spyByDefault()
                 .build();
-        brokerService = testPulsarServiceFactory.getBrokerService();
+        brokerService = pulsarTestContext.getBrokerService();
 
-        
testPulsarServiceFactory.getPulsarResources().getTenantResources().createTenant("public",
+        
pulsarTestContext.getPulsarResources().getTenantResources().createTenant("public",
                 TenantInfo.builder().build());
     }
 
     @AfterMethod(alwaysRun = true)
     public void cleanup() throws Exception {
-        if (testPulsarServiceFactory != null) {
-            testPulsarServiceFactory.close();
-            testPulsarServiceFactory = null;
+        if (pulsarTestContext != null) {
+            pulsarTestContext.close();
+            pulsarTestContext = null;
         }
     }
 
@@ -118,7 +118,7 @@ public class ServerCnxAuthorizationTest {
         svcConfig.setAuthenticateOriginalAuthData(true);
 
 
-        ServerCnx serverCnx = testPulsarServiceFactory.createServerCnxSpy();
+        ServerCnx serverCnx = pulsarTestContext.createServerCnxSpy();
         ChannelHandlerContext channelHandlerContext = 
mock(ChannelHandlerContext.class);
         Channel channel = mock(Channel.class);
         ChannelPipeline channelPipeline = mock(ChannelPipeline.class);
@@ -155,7 +155,7 @@ public class ServerCnxAuthorizationTest {
 
         AuthorizationService authorizationService =
                 
spyWithClassAndConstructorArgsRecordingInvocations(AuthorizationService.class, 
svcConfig,
-                        testPulsarServiceFactory.getPulsarResources());
+                        pulsarTestContext.getPulsarResources());
         
doReturn(authorizationService).when(brokerService).getAuthorizationService();
 
         // lookup
@@ -225,7 +225,7 @@ public class ServerCnxAuthorizationTest {
     public void testVerifyOriginalPrincipalWithoutAuthDataForwardedFromProxy() 
throws Exception {
         svcConfig.setAuthenticateOriginalAuthData(false);
 
-        ServerCnx serverCnx = testPulsarServiceFactory.createServerCnxSpy();
+        ServerCnx serverCnx = pulsarTestContext.createServerCnxSpy();
         ChannelHandlerContext channelHandlerContext = 
mock(ChannelHandlerContext.class);
         Channel channel = mock(Channel.class);
         ChannelPipeline channelPipeline = mock(ChannelPipeline.class);
@@ -257,7 +257,7 @@ public class ServerCnxAuthorizationTest {
 
         AuthorizationService authorizationService =
                 
spyWithClassAndConstructorArgsRecordingInvocations(AuthorizationService.class, 
svcConfig,
-                        testPulsarServiceFactory.getPulsarResources());
+                        pulsarTestContext.getPulsarResources());
         
doReturn(authorizationService).when(brokerService).getAuthorizationService();
 
         // lookup
@@ -318,7 +318,7 @@ public class ServerCnxAuthorizationTest {
 
     @Test
     public void testVerifyAuthRoleAndAuthDataFromDirectConnectionBroker() 
throws Exception {
-        ServerCnx serverCnx = testPulsarServiceFactory.createServerCnxSpy();
+        ServerCnx serverCnx = pulsarTestContext.createServerCnxSpy();
 
         ChannelHandlerContext channelHandlerContext = 
mock(ChannelHandlerContext.class);
         Channel channel = mock(Channel.class);
@@ -350,7 +350,7 @@ public class ServerCnxAuthorizationTest {
 
         AuthorizationService authorizationService =
                 
spyWithClassAndConstructorArgsRecordingInvocations(AuthorizationService.class, 
svcConfig,
-                        testPulsarServiceFactory.getPulsarResources());
+                        pulsarTestContext.getPulsarResources());
         
doReturn(authorizationService).when(brokerService).getAuthorizationService();
 
         // lookup
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index c4088d2e124..b990ced12b1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -71,7 +71,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.TestPulsarService;
+import org.apache.pulsar.broker.testcontext.PulsarTestContext;
 import org.apache.pulsar.broker.TransactionMetadataStoreService;
 import org.apache.pulsar.broker.auth.MockAuthenticationProvider;
 import org.apache.pulsar.broker.auth.MockMultiStageAuthenticationProvider;
@@ -143,7 +143,7 @@ public class ServerCnxTest {
     private ServiceConfiguration svcConfig;
     private ServerCnx serverCnx;
 
-    protected TestPulsarService.Factory testPulsarServiceFactory;
+    protected PulsarTestContext pulsarTestContext;
 
     protected PulsarService pulsar;
     protected BrokerService brokerService;
@@ -181,13 +181,13 @@ public class ServerCnxTest {
         svcConfig.setKeepAliveIntervalSeconds(inSec(1, TimeUnit.SECONDS));
         svcConfig.setBacklogQuotaCheckEnabled(false);
         svcConfig.setClusterName("use");
-        testPulsarServiceFactory = TestPulsarService.Factory.builder()
+        pulsarTestContext = PulsarTestContext.builder()
                 .config(svcConfig)
-                .useSpies(true)
+                .spyByDefault()
                 .build();
-        pulsar = testPulsarServiceFactory.getPulsarService();
+        pulsar = pulsarTestContext.getPulsarService();
 
-        brokerService = testPulsarServiceFactory.getBrokerService();
+        brokerService = pulsarTestContext.getBrokerService();
 
         namespaceService = pulsar.getNamespaceService();
         
doReturn(CompletableFuture.completedFuture(null)).when(namespaceService).getBundleAsync(any());
@@ -217,9 +217,9 @@ public class ServerCnxTest {
         if (channel != null) {
             channel.close();
         }
-        if (testPulsarServiceFactory != null) {
-            testPulsarServiceFactory.close();
-            testPulsarServiceFactory = null;
+        if (pulsarTestContext != null) {
+            pulsarTestContext.close();
+            pulsarTestContext = null;
         }
     }
 
@@ -895,7 +895,7 @@ public class ServerCnxTest {
                     () -> ((OpenLedgerCallback) 
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock,
                             null));
             return null;
-        }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+        }).when(pulsarTestContext.getManagedLedgerFactory())
                 .asyncOpen(matches(".*success.*"), 
any(ManagedLedgerConfig.class),
                         any(OpenLedgerCallback.class), any(Supplier.class), 
any());
 
@@ -950,7 +950,7 @@ public class ServerCnxTest {
                     () -> ((OpenLedgerCallback) 
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock,
                             null));
             return null;
-        }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+        }).when(pulsarTestContext.getManagedLedgerFactory())
                 .asyncOpen(matches(".*success.*"), 
any(ManagedLedgerConfig.class),
                         any(OpenLedgerCallback.class), any(Supplier.class), 
any());
 
@@ -1017,7 +1017,7 @@ public class ServerCnxTest {
 
             ((OpenLedgerCallback) 
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
             return null;
-        }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+        }).when(pulsarTestContext.getManagedLedgerFactory())
                 .asyncOpen(matches(".*success.*"), 
any(ManagedLedgerConfig.class),
                         any(OpenLedgerCallback.class), any(Supplier.class), 
any());
 
@@ -1096,7 +1096,7 @@ public class ServerCnxTest {
                     () -> ((OpenLedgerCallback) 
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock,
                             null));
             return null;
-        }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+        }).when(pulsarTestContext.getManagedLedgerFactory())
                 .asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class),
                         any(OpenLedgerCallback.class), any(Supplier.class), 
any());
 
@@ -1168,7 +1168,7 @@ public class ServerCnxTest {
                             null));
 
             return null;
-        }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+        }).when(pulsarTestContext.getManagedLedgerFactory())
                 .asyncOpen(matches(".*success.*"), 
any(ManagedLedgerConfig.class),
                         any(OpenLedgerCallback.class), any(Supplier.class), 
any());
 
@@ -1243,7 +1243,7 @@ public class ServerCnxTest {
                     () -> ((OpenLedgerCallback) 
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock,
                             null));
             return null;
-        }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+        }).when(pulsarTestContext.getManagedLedgerFactory())
                 .asyncOpen(matches(".*success.*"), 
any(ManagedLedgerConfig.class),
                         any(OpenLedgerCallback.class), any(Supplier.class), 
any());
 
@@ -1252,7 +1252,7 @@ public class ServerCnxTest {
             openTopicFail.complete(() -> ((OpenLedgerCallback) 
invocationOnMock.getArguments()[2])
                     .openLedgerFailed(new ManagedLedgerException("Managed 
ledger failure"), null));
             return null;
-        }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+        }).when(pulsarTestContext.getManagedLedgerFactory())
                 .asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class),
                         any(OpenLedgerCallback.class), any(Supplier.class), 
any());
 
@@ -1494,7 +1494,7 @@ public class ServerCnxTest {
         // add `clusterDispatchRate` otherwise there will be a NPE
         // 
`org.apache.pulsar.broker.service.AbstractTopic.updateNamespaceReplicatorDispatchRate`
         policies.replicatorDispatchRate = new HashMap<>();
-        testPulsarServiceFactory.getPulsarResources().getNamespaceResources()
+        pulsarTestContext.getPulsarResources().getNamespaceResources()
                 
.createPolicies(TopicName.get(encryptionRequiredTopicName).getNamespaceObject(),
                         policies);
 
@@ -1532,7 +1532,7 @@ public class ServerCnxTest {
         // add `clusterDispatchRate` otherwise there will be a NPE
         // 
`org.apache.pulsar.broker.service.AbstractTopic.updateNamespaceReplicatorDispatchRate`
         policies.replicatorDispatchRate = new HashMap<>();
-        testPulsarServiceFactory.getPulsarResources().getNamespaceResources()
+        pulsarTestContext.getPulsarResources().getNamespaceResources()
                 
.createPolicies(TopicName.get(encryptionRequiredTopicName).getNamespaceObject(),
                         policies);
 
@@ -1574,7 +1574,7 @@ public class ServerCnxTest {
         // add `clusterDispatchRate` otherwise there will be a NPE
         // 
`org.apache.pulsar.broker.service.AbstractTopic.updateNamespaceReplicatorDispatchRate`
         policies.replicatorDispatchRate = new HashMap<>();
-        testPulsarServiceFactory.getPulsarResources().getNamespaceResources()
+        pulsarTestContext.getPulsarResources().getNamespaceResources()
                 
.createPolicies(TopicName.get(encryptionRequiredTopicName).getNamespaceObject(),
                         policies);
 
@@ -1614,7 +1614,7 @@ public class ServerCnxTest {
         // add `clusterDispatchRate` otherwise there will be a NPE
         // 
`org.apache.pulsar.broker.service.AbstractTopic.updateNamespaceReplicatorDispatchRate`
         policies.replicatorDispatchRate = new HashMap<>();
-        testPulsarServiceFactory.getPulsarResources().getNamespaceResources()
+        pulsarTestContext.getPulsarResources().getNamespaceResources()
                 
.createPolicies(TopicName.get(encryptionRequiredTopicName).getNamespaceObject(),
                         policies);
 
@@ -1660,7 +1660,7 @@ public class ServerCnxTest {
         // add `clusterDispatchRate` otherwise there will be a NPE
         // 
`org.apache.pulsar.broker.service.AbstractTopic.updateNamespaceReplicatorDispatchRate`
         policies.replicatorDispatchRate = new HashMap<>();
-        testPulsarServiceFactory.getPulsarResources().getNamespaceResources()
+        pulsarTestContext.getPulsarResources().getNamespaceResources()
                 
.createPolicies(TopicName.get(encryptionRequiredTopicName).getNamespaceObject(),
                         policies);
 
@@ -1740,7 +1740,7 @@ public class ServerCnxTest {
             Thread.sleep(300);
             ((OpenLedgerCallback) 
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
             return null;
-        }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+        }).when(pulsarTestContext.getManagedLedgerFactory())
                 .asyncOpen(matches(".*success.*"), 
any(ManagedLedgerConfig.class),
                         any(OpenLedgerCallback.class), any(Supplier.class), 
any());
 
@@ -1751,7 +1751,7 @@ public class ServerCnxTest {
                     .openLedgerFailed(new ManagedLedgerException("Managed 
ledger failure"), null)).start();
 
             return null;
-        }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+        }).when(pulsarTestContext.getManagedLedgerFactory())
                 .asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class),
                         any(OpenLedgerCallback.class), any(Supplier.class), 
any());
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherFailoverConsumerStreamingDispatcherTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherFailoverConsumerStreamingDispatcherTest.java
index 00e31c51c9c..f86fe0701dc 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherFailoverConsumerStreamingDispatcherTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherFailoverConsumerStreamingDispatcherTest.java
@@ -32,6 +32,6 @@ public class 
PersistentDispatcherFailoverConsumerStreamingDispatcherTest extends
     @BeforeMethod(alwaysRun = true)
     public void setup() throws Exception {
         super.setup();
-        testPulsarServiceFactory.getConfig().setStreamingDispatch(true);
+        pulsarTestContext.getConfig().setStreamingDispatch(true);
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicStreamingDispatcherTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicStreamingDispatcherTest.java
index d7699e0b817..440cbbe290c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicStreamingDispatcherTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicStreamingDispatcherTest.java
@@ -33,7 +33,7 @@ public class PersistentTopicStreamingDispatcherTest extends 
PersistentTopicTest
     @BeforeMethod(alwaysRun = true)
     public void setup() throws Exception {
         super.setup();
-        ServiceConfiguration config = testPulsarServiceFactory.getConfig();
+        ServiceConfiguration config = pulsarTestContext.getConfig();
         config.setTopicLevelPoliciesEnabled(false);
         config.setSystemTopicEnabled(false);
         config.setStreamingDispatch(true);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java
new file mode 100644
index 00000000000..e8a9063a1ea
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.testcontext;
+
+import org.apache.pulsar.broker.BookKeeperClientFactory;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.intercept.BrokerInterceptor;
+import org.apache.pulsar.broker.service.PulsarMetadataEventSynchronizer;
+import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+
+abstract class AbstractTestPulsarService extends PulsarService {
+    protected final MetadataStoreExtended localMetadataStore;
+    protected final MetadataStoreExtended configurationMetadataStore;
+    protected final Compactor compactor;
+    protected final BrokerInterceptor brokerInterceptor;
+    protected final BookKeeperClientFactory bookKeeperClientFactory;
+
+    public AbstractTestPulsarService(ServiceConfiguration config, 
MetadataStoreExtended localMetadataStore,
+                                     MetadataStoreExtended 
configurationMetadataStore, Compactor compactor,
+                                     BrokerInterceptor brokerInterceptor,
+                                     BookKeeperClientFactory 
bookKeeperClientFactory) {
+        super(config);
+        this.localMetadataStore =
+                
NonClosingProxyHandler.createNonClosingProxy(localMetadataStore, 
MetadataStoreExtended.class);
+        this.configurationMetadataStore =
+                
NonClosingProxyHandler.createNonClosingProxy(configurationMetadataStore, 
MetadataStoreExtended.class);
+        this.compactor = compactor;
+        this.brokerInterceptor = brokerInterceptor;
+        this.bookKeeperClientFactory = bookKeeperClientFactory;
+    }
+
+    @Override
+    public MetadataStore 
createConfigurationMetadataStore(PulsarMetadataEventSynchronizer synchronizer)
+            throws MetadataStoreException {
+
+        return configurationMetadataStore;
+    }
+
+    @Override
+    public MetadataStoreExtended 
createLocalMetadataStore(PulsarMetadataEventSynchronizer synchronizer)
+            throws MetadataStoreException, PulsarServerException {
+        return localMetadataStore;
+    }
+
+    @Override
+    public Compactor getCompactor() throws PulsarServerException {
+        if (compactor != null) {
+            return compactor;
+        } else {
+            return super.getCompactor();
+        }
+    }
+
+    @Override
+    public BrokerInterceptor getBrokerInterceptor() {
+        if (brokerInterceptor != null) {
+            return brokerInterceptor;
+        } else {
+            return super.getBrokerInterceptor();
+        }
+    }
+
+    @Override
+    public BookKeeperClientFactory newBookKeeperClientFactory() {
+        return bookKeeperClientFactory;
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/MockBookKeeperClientFactory.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/MockBookKeeperClientFactory.java
new file mode 100644
index 00000000000..c173be9046b
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/MockBookKeeperClientFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.testcontext;
+
+import io.netty.channel.EventLoopGroup;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.pulsar.broker.BookKeeperClientFactory;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+
+class MockBookKeeperClientFactory implements BookKeeperClientFactory {
+    private final BookKeeper mockBookKeeper;
+
+    MockBookKeeperClientFactory(BookKeeper mockBookKeeper) {
+        this.mockBookKeeper = mockBookKeeper;
+    }
+
+    @Override
+    public BookKeeper create(ServiceConfiguration conf, MetadataStoreExtended 
store,
+                             EventLoopGroup eventLoopGroup,
+                             Optional<Class<? extends 
EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
+                             Map<String, Object> properties) {
+        // Always return the same instance (so that we don't loose the mock BK 
content on broker restart
+        return mockBookKeeper;
+    }
+
+    @Override
+    public BookKeeper create(ServiceConfiguration conf, MetadataStoreExtended 
store,
+                             EventLoopGroup eventLoopGroup,
+                             Optional<Class<? extends 
EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
+                             Map<String, Object> properties, StatsLogger 
statsLogger) {
+        // Always return the same instance (so that we don't loose the mock BK 
content on broker restart
+        return mockBookKeeper;
+    }
+
+    @Override
+    public void close() {
+        // no-op
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherFailoverConsumerStreamingDispatcherTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonClosableMockBookKeeper.java
similarity index 53%
copy from 
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherFailoverConsumerStreamingDispatcherTest.java
copy to 
pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonClosableMockBookKeeper.java
index 00e31c51c9c..23cc07d3bdf 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherFailoverConsumerStreamingDispatcherTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonClosableMockBookKeeper.java
@@ -16,22 +16,29 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.broker.service.persistent;
+package org.apache.pulsar.broker.testcontext;
 
-import 
org.apache.pulsar.broker.service.PersistentDispatcherFailoverConsumerTest;
-import org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
+import org.apache.bookkeeper.client.PulsarMockBookKeeper;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
 
-/**
- * PersistentDispatcherFailoverConsumerTest with {@link StreamingDispatcher}
- */
-@Test(groups = "quarantine")
-public class PersistentDispatcherFailoverConsumerStreamingDispatcherTest 
extends PersistentDispatcherFailoverConsumerTest {
+// Prevent the MockBookKeeper instance from being closed when the broker is 
restarted within a test
+class NonClosableMockBookKeeper extends PulsarMockBookKeeper {
+
+    public NonClosableMockBookKeeper(OrderedExecutor executor) throws 
Exception {
+        super(executor);
+    }
+
+    @Override
+    public void close() {
+        // no-op
+    }
+
+    @Override
+    public void shutdown() {
+        // no-op
+    }
 
-    @BeforeMethod(alwaysRun = true)
-    public void setup() throws Exception {
-        super.setup();
-        testPulsarServiceFactory.getConfig().setStreamingDispatch(true);
+    public void reallyShutdown() {
+        super.shutdown();
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonClosingProxyHandler.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonClosingProxyHandler.java
new file mode 100644
index 00000000000..3156d25a74e
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonClosingProxyHandler.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.testcontext;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+
+public class NonClosingProxyHandler implements InvocationHandler {
+    private final AutoCloseable delegate;
+
+    NonClosingProxyHandler(AutoCloseable delegate) {
+        this.delegate = delegate;
+    }
+
+    public static <T extends AutoCloseable> T createNonClosingProxy(T 
delegate, Class<T> interfaceClass) {
+        if (isNonClosingProxy(delegate)) {
+            return delegate;
+        }
+        return 
interfaceClass.cast(Proxy.newProxyInstance(delegate.getClass().getClassLoader(),
+                new Class<?>[] {interfaceClass}, new 
NonClosingProxyHandler(delegate)));
+    }
+
+    public static boolean isNonClosingProxy(Object instance) {
+        return Proxy.isProxyClass(instance.getClass())
+                && Proxy.getInvocationHandler(instance) instanceof 
NonClosingProxyHandler;
+    }
+
+    public static <T extends I, I extends AutoCloseable> I getDelegate(T 
instance) {
+        if (isNonClosingProxy(instance)) {
+            return (T) ((NonClosingProxyHandler) 
Proxy.getInvocationHandler(instance)).getDelegate();
+        } else {
+            throw new IllegalArgumentException("not a proxy instance with 
NonClosingProxyHandler");
+        }
+    }
+
+    public static <T extends AutoCloseable> void reallyClose(T instance) 
throws Exception {
+        if (isNonClosingProxy(instance)) {
+            getDelegate(instance).close();
+        } else {
+            instance.close();
+        }
+    }
+
+    public AutoCloseable getDelegate() {
+        return delegate;
+    }
+
+    @Override
+    public Object invoke(Object proxy, Method method, Object[] args) throws 
Throwable {
+        if (method.getName().equals("close")) {
+            return null;
+        } else {
+            return method.invoke(delegate, args);
+        }
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java
new file mode 100644
index 00000000000..c65798ed7f6
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.testcontext;
+
+import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static org.mockito.Mockito.mock;
+import io.netty.channel.EventLoopGroup;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
+import org.apache.pulsar.broker.BookKeeperClientFactory;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.intercept.BrokerInterceptor;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.broker.resources.NamespaceResources;
+import org.apache.pulsar.broker.resources.PulsarResources;
+import org.apache.pulsar.broker.resources.TopicResources;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.schema.DefaultSchemaRegistryService;
+import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
+import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+
+/**
+ * Subclass of PulsarService that is used for some tests.
+ * This was written as a replacement for the previous Mockito Spy over 
PulsarService solution which caused
+ * a flaky test issue https://github.com/apache/pulsar/issues/13620.
+ */
+
+class NonStartableTestPulsarService extends AbstractTestPulsarService {
+    private final PulsarResources pulsarResources;
+    private final ManagedLedgerStorage managedLedgerClientFactory;
+    private final BrokerService brokerService;
+
+    private final SchemaRegistryService schemaRegistryService;
+
+    private final PulsarClientImpl pulsarClient;
+
+    private final NamespaceService namespaceService;
+
+    public NonStartableTestPulsarService(SpyConfig spyConfig, 
ServiceConfiguration config,
+                                            MetadataStoreExtended 
localMetadataStore,
+                                            MetadataStoreExtended 
configurationMetadataStore,
+                                            Compactor compactor, 
BrokerInterceptor brokerInterceptor,
+                                            BookKeeperClientFactory 
bookKeeperClientFactory,
+                                            PulsarResources pulsarResources,
+                                            ManagedLedgerStorage 
managedLedgerClientFactory) {
+        super(config, localMetadataStore, configurationMetadataStore, 
compactor, brokerInterceptor,
+                bookKeeperClientFactory);
+        this.pulsarResources = pulsarResources;
+        this.managedLedgerClientFactory = managedLedgerClientFactory;
+        try {
+            this.brokerService = 
spyConfig.getBrokerService().spy(TestBrokerService.class, this, 
getIoEventLoopGroup());
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        this.schemaRegistryService = 
spyWithClassAndConstructorArgs(DefaultSchemaRegistryService.class);
+        this.pulsarClient = mock(PulsarClientImpl.class);
+        this.namespaceService = mock(NamespaceService.class);
+        try {
+            startNamespaceService();
+        } catch (PulsarServerException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void start() throws PulsarServerException {
+        throw new UnsupportedOperationException("Cannot start a non-startable 
TestPulsarService");
+    }
+
+    @Override
+    public Supplier<NamespaceService> getNamespaceServiceProvider() throws 
PulsarServerException {
+        return () -> namespaceService;
+    }
+
+    @Override
+    public synchronized PulsarClient getClient() throws PulsarServerException {
+        return pulsarClient;
+    }
+
+    @Override
+    public PulsarClientImpl createClientImpl(ClientConfigurationData 
clientConf) throws PulsarClientException {
+        return pulsarClient;
+    }
+
+    @Override
+    public SchemaRegistryService getSchemaRegistryService() {
+        return schemaRegistryService;
+    }
+
+    @Override
+    public PulsarResources getPulsarResources() {
+        return pulsarResources;
+    }
+
+    public BrokerService getBrokerService() {
+        return brokerService;
+    }
+
+    @Override
+    public MetadataStore getConfigurationMetadataStore() {
+        return configurationMetadataStore;
+    }
+
+    @Override
+    public MetadataStoreExtended getLocalMetadataStore() {
+        return localMetadataStore;
+    }
+
+    @Override
+    public ManagedLedgerStorage getManagedLedgerClientFactory() {
+        return managedLedgerClientFactory;
+    }
+
+    @Override
+    protected PulsarResources newPulsarResources() {
+        return pulsarResources;
+    }
+
+    @Override
+    protected ManagedLedgerStorage newManagedLedgerClientFactory() throws 
Exception {
+        return managedLedgerClientFactory;
+    }
+
+    @Override
+    protected BrokerService newBrokerService(PulsarService pulsar) throws 
Exception {
+        return brokerService;
+    }
+
+    @Override
+    public BookKeeperClientFactory getBookKeeperClientFactory() {
+        return bookKeeperClientFactory;
+    }
+
+    static class TestBrokerService extends BrokerService {
+
+        TestBrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) 
throws Exception {
+            super(pulsar, eventLoopGroup);
+        }
+
+        @Override
+        protected CompletableFuture<Map<String, String>> 
fetchTopicPropertiesAsync(TopicName topicName) {
+            return CompletableFuture.completedFuture(Collections.emptyMap());
+        }
+    }
+
+    static class TestPulsarResources extends PulsarResources {
+
+        private final TopicResources topicResources;
+        private final NamespaceResources namespaceResources;
+
+        public TestPulsarResources(MetadataStore localMetadataStore, 
MetadataStore configurationMetadataStore,
+                                   TopicResources topicResources, 
NamespaceResources namespaceResources) {
+            super(localMetadataStore, configurationMetadataStore);
+            this.topicResources = topicResources;
+            this.namespaceResources = namespaceResources;
+        }
+
+        @Override
+        public TopicResources getTopicResources() {
+            return topicResources;
+        }
+
+        @Override
+        public NamespaceResources getNamespaceResources() {
+            return namespaceResources;
+        }
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
new file mode 100644
index 00000000000..f42c3cb146e
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker.testcontext;
+
+import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgsRecordingInvocations;
+import static org.mockito.Mockito.mock;
+import com.google.common.util.concurrent.MoreExecutors;
+import io.netty.channel.EventLoopGroup;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import lombok.AccessLevel;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.Singular;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.stats.NullStatsProvider;
+import org.apache.bookkeeper.stats.StatsProvider;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.pulsar.broker.BookKeeperClientFactory;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.intercept.BrokerInterceptor;
+import org.apache.pulsar.broker.resources.NamespaceResources;
+import org.apache.pulsar.broker.resources.PulsarResources;
+import org.apache.pulsar.broker.resources.TopicResources;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.ServerCnx;
+import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
+import org.apache.pulsar.common.util.GracefulExecutorServicesShutdown;
+import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.metadata.impl.MetadataStoreFactoryImpl;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.MockZooKeeper;
+import org.apache.zookeeper.MockZooKeeperSession;
+import org.apache.zookeeper.data.ACL;
+import org.jetbrains.annotations.NotNull;
+
+@Slf4j
+@ToString
+@Getter
+@Builder(builderClassName = "Builder")
+public class PulsarTestContext implements AutoCloseable {
+    private final ServiceConfiguration config;
+    private final MetadataStoreExtended localMetadataStore;
+    private final MetadataStoreExtended configurationMetadataStore;
+    private final PulsarResources pulsarResources;
+
+    private final OrderedExecutor executor;
+
+    private final ManagedLedgerStorage managedLedgerClientFactory;
+
+    private final PulsarService pulsarService;
+
+    private final Compactor compactor;
+
+    private final BrokerService brokerService;
+
+    @Getter(AccessLevel.NONE)
+    @Singular("registerCloseable")
+    private final List<AutoCloseable> closeables;
+
+    private final BrokerInterceptor brokerInterceptor;
+
+    private final BookKeeper bookKeeperClient;
+
+    private final boolean startable;
+
+
+    public ManagedLedgerFactory getManagedLedgerFactory() {
+        return managedLedgerClientFactory.getManagedLedgerFactory();
+    }
+
+    public static Builder startableBuilder() {
+        return new StartableCustomBuilder();
+    }
+
+    public static Builder builder() {
+        return new NonStartableCustomBuilder();
+    }
+
+    public void close() throws Exception {
+        for (int i = closeables.size() - 1; i >= 0; i--) {
+            try {
+                closeables.get(i).close();
+            } catch (Exception e) {
+                log.error("Failure in calling cleanup function", e);
+            }
+        }
+    }
+
+    public ServerCnx createServerCnxSpy() {
+        return 
spyWithClassAndConstructorArgsRecordingInvocations(ServerCnx.class,
+                getPulsarService());
+    }
+
+    public static class Builder {
+        protected boolean useTestPulsarResources = false;
+        protected MetadataStore pulsarResourcesMetadataStore;
+        protected Function<PulsarService, BrokerService> brokerServiceFunction;
+        protected SpyConfig.Builder spyConfigBuilder = 
SpyConfig.builder(SpyConfig.SpyType.NONE);
+
+        public Builder spyByDefault() {
+            spyConfigBuilder = SpyConfig.builder(SpyConfig.SpyType.SPY);
+            return this;
+        }
+
+        public Builder spyConfig(Consumer<SpyConfig.Builder> 
spyConfigCustomizer) {
+            spyConfigCustomizer.accept(spyConfigBuilder);
+            return this;
+        }
+
+
+        public Builder reuseMockBookkeeperAndMetadataStores(PulsarTestContext 
otherContext) {
+            bookKeeperClient(otherContext.getBookKeeperClient());
+            
localMetadataStore(NonClosingProxyHandler.createNonClosingProxy(otherContext.getLocalMetadataStore(),
+                    MetadataStoreExtended.class
+            ));
+            
configurationMetadataStore(NonClosingProxyHandler.createNonClosingProxy(
+                    otherContext.getConfigurationMetadataStore(), 
MetadataStoreExtended.class
+            ));
+            return this;
+        }
+
+        public Builder withMockZookeeper() {
+            try {
+                MockZooKeeper mockZooKeeper = createMockZooKeeper();
+                registerCloseable(mockZooKeeper::shutdown);
+                MockZooKeeperSession mockZooKeeperSession = 
MockZooKeeperSession.newInstance(mockZooKeeper);
+                ZKMetadataStore zkMetadataStore = new 
ZKMetadataStore(mockZooKeeperSession);
+                registerCloseable(zkMetadataStore::close);
+                localMetadataStore(zkMetadataStore);
+                configurationMetadataStore(zkMetadataStore);
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+            return this;
+        }
+
+        private static MockZooKeeper createMockZooKeeper() throws Exception {
+            MockZooKeeper zk = 
MockZooKeeper.newInstance(MoreExecutors.newDirectExecutorService());
+            List<ACL> dummyAclList = new ArrayList<>(0);
+
+            ZkUtils.createFullPathOptimistic(zk, 
"/ledgers/available/192.168.1.1:" + 5000,
+                    "".getBytes(StandardCharsets.UTF_8), dummyAclList, 
CreateMode.PERSISTENT);
+
+            zk.create("/ledgers/LAYOUT", 
"1\nflat:1".getBytes(StandardCharsets.UTF_8), dummyAclList,
+                    CreateMode.PERSISTENT);
+            return zk;
+        }
+
+        public Builder useTestPulsarResources() {
+            if (startable) {
+                throw new IllegalStateException("Cannot useTestPulsarResources 
when startable.");
+            }
+            useTestPulsarResources = true;
+            return this;
+        }
+
+        public Builder useTestPulsarResources(MetadataStore metadataStore) {
+            if (startable) {
+                throw new IllegalStateException("Cannot useTestPulsarResources 
when startable.");
+            }
+            useTestPulsarResources = true;
+            pulsarResourcesMetadataStore = metadataStore;
+            return this;
+        }
+
+        public Builder managedLedgerClients(BookKeeper bookKeeperClient,
+                                            ManagedLedgerFactory 
managedLedgerFactory) {
+            return managedLedgerClientFactory(
+                    
PulsarTestContext.createManagedLedgerClientFactory(bookKeeperClient, 
managedLedgerFactory));
+        }
+
+        public Builder brokerServiceFunction(
+                Function<PulsarService, BrokerService> brokerServiceFunction) {
+            this.brokerServiceFunction = brokerServiceFunction;
+            return this;
+        }
+    }
+
+    static abstract class AbstractCustomBuilder extends Builder {
+        AbstractCustomBuilder(boolean startable) {
+            super.startable = startable;
+        }
+
+        public Builder startable(boolean startable) {
+            throw new UnsupportedOperationException("Cannot change 
startability after builder creation.");
+        }
+
+        @Override
+        public final PulsarTestContext build() {
+            SpyConfig spyConfig = spyConfigBuilder.build();
+            if (super.config == null) {
+                ServiceConfiguration svcConfig = new ServiceConfiguration();
+                initializeConfig(svcConfig);
+                config(svcConfig);
+            }
+            initializeCommonPulsarServices(spyConfig);
+            initializePulsarServices(spyConfig, this);
+            if (super.startable) {
+                try {
+                    super.pulsarService.start();
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }
+            brokerService(super.pulsarService.getBrokerService());
+            return super.build();
+        }
+
+        protected void initializeConfig(ServiceConfiguration svcConfig) {
+            svcConfig.setBrokerShutdownTimeoutMs(0L);
+            
svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
+            svcConfig.setClusterName("pulsar-cluster");
+            svcConfig.setNumIOThreads(1);
+            svcConfig.setNumOrderedExecutorThreads(1);
+            svcConfig.setNumExecutorThreadPoolSize(2);
+            svcConfig.setNumCacheExecutorThreadPoolSize(2);
+            svcConfig.setNumHttpServerThreads(2);
+        }
+
+        private void initializeCommonPulsarServices(SpyConfig spyConfig) {
+            if (super.bookKeeperClient == null && 
super.managedLedgerClientFactory == null) {
+                if (super.executor == null) {
+                    OrderedExecutor createdExecutor = 
OrderedExecutor.newBuilder().numThreads(1)
+                            
.name(NonStartableTestPulsarService.class.getSimpleName() + 
"-executor").build();
+                    registerCloseable(() -> 
GracefulExecutorServicesShutdown.initiate()
+                            .timeout(Duration.ZERO)
+                            .shutdown(createdExecutor)
+                            .handle().get());
+                    super.executor = createdExecutor;
+                }
+                NonClosableMockBookKeeper mockBookKeeper;
+                try {
+                    mockBookKeeper =
+                            
spyConfig.getBookKeeperClient().spy(NonClosableMockBookKeeper.class, 
super.executor);
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+                registerCloseable(mockBookKeeper::reallyShutdown);
+                bookKeeperClient(mockBookKeeper);
+            }
+            if (super.bookKeeperClient == null && 
super.managedLedgerClientFactory != null) {
+                
bookKeeperClient(super.managedLedgerClientFactory.getBookKeeperClient());
+            }
+            if (super.localMetadataStore == null || 
super.configurationMetadataStore == null) {
+                try {
+                    MetadataStoreExtended store = 
MetadataStoreFactoryImpl.createExtended("memory:local",
+                            MetadataStoreConfig.builder().build());
+                    registerCloseable(store::close);
+                    if (super.localMetadataStore == null) {
+                        localMetadataStore(store);
+                    }
+                    if (super.configurationMetadataStore == null) {
+                        configurationMetadataStore(store);
+                    }
+                } catch (MetadataStoreException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+
+        protected abstract void initializePulsarServices(SpyConfig spyConfig, 
Builder builder);
+    }
+
+
+    static class StartableCustomBuilder extends AbstractCustomBuilder {
+        StartableCustomBuilder() {
+            super(true);
+        }
+
+        @Override
+        public Builder managedLedgerClientFactory(ManagedLedgerStorage 
managedLedgerClientFactory) {
+            throw new IllegalStateException("Cannot set 
managedLedgerClientFactory when startable.");
+        }
+
+        @Override
+        public Builder pulsarResources(PulsarResources pulsarResources) {
+            throw new IllegalStateException("Cannot set pulsarResources when 
startable.");
+        }
+
+        @Override
+        protected void initializePulsarServices(SpyConfig spyConfig, Builder 
builder) {
+            BookKeeperClientFactory bookKeeperClientFactory =
+                    new MockBookKeeperClientFactory(builder.bookKeeperClient);
+            PulsarService pulsarService = spyConfig.getPulsarBroker()
+                    .spy(StartableTestPulsarService.class, builder.config, 
builder.localMetadataStore,
+                            builder.configurationMetadataStore, 
builder.compactor, builder.brokerInterceptor,
+                            bookKeeperClientFactory);
+            registerCloseable(pulsarService::close);
+            pulsarService(pulsarService);
+        }
+
+        @Override
+        protected void initializeConfig(ServiceConfiguration svcConfig) {
+            super.initializeConfig(svcConfig);
+            svcConfig.setBrokerShutdownTimeoutMs(5000L);
+        }
+    }
+
+    static class NonStartableCustomBuilder extends AbstractCustomBuilder {
+
+        NonStartableCustomBuilder() {
+            super(false);
+        }
+
+        @Override
+        protected void initializePulsarServices(SpyConfig spyConfig, Builder 
builder) {
+            if (builder.managedLedgerClientFactory == null) {
+                ManagedLedgerFactory mlFactoryMock = 
mock(ManagedLedgerFactory.class);
+                managedLedgerClientFactory(
+                        
PulsarTestContext.createManagedLedgerClientFactory(builder.bookKeeperClient, 
mlFactoryMock));
+            }
+            if (builder.pulsarResources == null) {
+                SpyConfig.SpyType spyConfigPulsarResources = 
spyConfig.getPulsarResources();
+                if (useTestPulsarResources) {
+                    MetadataStore metadataStore = pulsarResourcesMetadataStore;
+                    if (metadataStore == null) {
+                        metadataStore = builder.configurationMetadataStore;
+                    }
+                    NamespaceResources nsr = 
spyConfigPulsarResources.spy(NamespaceResources.class, metadataStore, 30);
+                    TopicResources tsr = 
spyConfigPulsarResources.spy(TopicResources.class, metadataStore);
+                    pulsarResources(
+                            spyConfigPulsarResources.spy(
+                                    
NonStartableTestPulsarService.TestPulsarResources.class, 
builder.localMetadataStore,
+                                    builder.configurationMetadataStore,
+                                    tsr, nsr));
+                } else {
+                    pulsarResources(
+                            
spyConfigPulsarResources.spy(PulsarResources.class, builder.localMetadataStore,
+                                    builder.configurationMetadataStore));
+                }
+            }
+            BookKeeperClientFactory bookKeeperClientFactory =
+                    new MockBookKeeperClientFactory(builder.bookKeeperClient);
+            PulsarService pulsarService = spyConfig.getPulsarBroker()
+                    .spy(NonStartableTestPulsarService.class, spyConfig, 
builder.config, builder.localMetadataStore,
+                            builder.configurationMetadataStore, 
builder.compactor, builder.brokerInterceptor,
+                            bookKeeperClientFactory, builder.pulsarResources,
+                            builder.managedLedgerClientFactory);
+            registerCloseable(pulsarService::close);
+            pulsarService(pulsarService);
+        }
+    }
+
+    @NotNull
+    private static ManagedLedgerStorage 
createManagedLedgerClientFactory(BookKeeper bookKeeperClient,
+                                                                         
ManagedLedgerFactory managedLedgerFactory) {
+        return new ManagedLedgerStorage() {
+
+            @Override
+            public void initialize(ServiceConfiguration conf, 
MetadataStoreExtended metadataStore,
+                                   BookKeeperClientFactory bookkeeperProvider, 
EventLoopGroup eventLoopGroup)
+                    throws Exception {
+
+            }
+
+            @Override
+            public ManagedLedgerFactory getManagedLedgerFactory() {
+                return managedLedgerFactory;
+            }
+
+            @Override
+            public StatsProvider getStatsProvider() {
+                return new NullStatsProvider();
+            }
+
+            @Override
+            public BookKeeper getBookKeeperClient() {
+                return bookKeeperClient;
+            }
+
+            @Override
+            public void close() throws IOException {
+
+            }
+        };
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/SpyConfig.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/SpyConfig.java
new file mode 100644
index 00000000000..2e3b863cc8a
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/SpyConfig.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker.testcontext;
+
+import lombok.Value;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.mockito.Mockito;
+import org.mockito.internal.creation.instance.ConstructorInstantiator;
+
[email protected](builderClassName = "Builder", toBuilder = true)
+@Value
+public class SpyConfig {
+    public enum SpyType {
+        NONE,
+        SPY,
+        SPY_ALSO_INVOCATIONS;
+
+        public <T> T spy(T object) {
+            if (object == null) {
+                return null;
+            }
+            switch (this) {
+                case NONE:
+                    return object;
+                case SPY:
+                    return 
BrokerTestUtil.spyWithoutRecordingInvocations(object);
+                case SPY_ALSO_INVOCATIONS:
+                    return Mockito.spy(object);
+                default:
+                    throw new UnsupportedOperationException("Unknown spy type: 
" + this);
+            }
+        }
+
+        public <T> T spy(Class<T> clazz, Object... args) {
+            switch (this) {
+                case NONE:
+                    // Use Mockito's internal class to instantiate the object
+                    return new ConstructorInstantiator(false, 
args).newInstance(clazz);
+                case SPY:
+                    return 
BrokerTestUtil.spyWithClassAndConstructorArgs(clazz, args);
+                case SPY_ALSO_INVOCATIONS:
+                    return 
BrokerTestUtil.spyWithClassAndConstructorArgsRecordingInvocations(clazz, args);
+                default:
+                    throw new UnsupportedOperationException("Unknown spy type: 
" + this);
+            }
+        }
+    }
+
+    private final SpyType pulsarBroker;
+    private final SpyType pulsarResources;
+    private final SpyType brokerService;
+    private final SpyType bookKeeperClient;
+
+    public static Builder builder() {
+        return builder(SpyType.NONE);
+    }
+
+    public static Builder builder(SpyType defaultSpyType) {
+        Builder spyConfigBuilder = new Builder();
+        spyConfigBuilder.pulsarBroker(defaultSpyType);
+        spyConfigBuilder.pulsarResources(defaultSpyType);
+        spyConfigBuilder.brokerService(defaultSpyType);
+        spyConfigBuilder.bookKeeperClient(defaultSpyType);
+        return spyConfigBuilder;
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/StartableTestPulsarService.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/StartableTestPulsarService.java
new file mode 100644
index 00000000000..a171ce97c11
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/StartableTestPulsarService.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.testcontext;
+
+import org.apache.pulsar.broker.BookKeeperClientFactory;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.intercept.BrokerInterceptor;
+import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+
+class StartableTestPulsarService extends AbstractTestPulsarService {
+    public StartableTestPulsarService(ServiceConfiguration config,
+                                      MetadataStoreExtended localMetadataStore,
+                                      MetadataStoreExtended 
configurationMetadataStore,
+                                      Compactor compactor,
+                                      BrokerInterceptor brokerInterceptor,
+                                      BookKeeperClientFactory 
bookKeeperClientFactory) {
+        super(config, localMetadataStore, configurationMetadataStore, 
compactor, brokerInterceptor,
+                bookKeeperClientFactory);
+    }
+}

Reply via email to