This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6112b6501e3 [improve][test] Reduce the use of Mockito spies/mocks in 
tests (#19326)
6112b6501e3 is described below

commit 6112b6501e32b9d46612ebbc7a18482661894053
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Jan 25 22:38:30 2023 +0200

    [improve][test] Reduce the use of Mockito spies/mocks in tests (#19326)
---
 .../apache/pulsar/broker/TestPulsarService.java    |  39 +-
 .../broker/loadbalance/SimpleBrokerStartTest.java  |   5 +-
 .../loadbalance/SimpleLoadManagerImplTest.java     |   3 +-
 .../broker/lookup/http/HttpTopicLookupv2Test.java  |  12 +-
 .../broker/namespace/OwnershipCacheTest.java       |   6 +-
 .../PersistentDispatcherFailoverConsumerTest.java  |   2 +-
 .../service/PersistentTopicConcurrentTest.java     |   3 +-
 .../pulsar/broker/service/ServerCnxTest.java       | 462 ++++++++++-----------
 .../persistent/PersistentSubscriptionTest.java     |   3 +-
 .../broker/web/ProcessHandlerFilterTest.java       |  14 +-
 .../client/api/ClientDeduplicationFailureTest.java |   4 +-
 11 files changed, 284 insertions(+), 269 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/TestPulsarService.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/TestPulsarService.java
index 873be15b8ff..3c998515cb1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/TestPulsarService.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/TestPulsarService.java
@@ -22,13 +22,16 @@ package org.apache.pulsar.broker;
 import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
 import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgsRecordingInvocations;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import java.io.IOException;
 import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
+import java.util.function.Supplier;
 import lombok.Builder;
 import lombok.Getter;
 import lombok.ToString;
@@ -37,6 +40,7 @@ import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.stats.NullStatsProvider;
 import org.apache.bookkeeper.stats.StatsProvider;
+import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.resources.NamespaceResources;
 import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.broker.resources.TopicResources;
@@ -49,6 +53,7 @@ import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
 import org.apache.pulsar.broker.transaction.TransactionTestBase;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.GracefulExecutorServicesShutdown;
 import org.apache.pulsar.compaction.Compactor;
 import org.apache.pulsar.metadata.api.MetadataStore;
@@ -168,7 +173,7 @@ public class TestPulsarService extends PulsarService {
                     throw new RuntimeException(e);
                 }
                 if (super.config == null) {
-                    ServiceConfiguration svcConfig = 
spy(ServiceConfiguration.class);
+                    ServiceConfiguration svcConfig = new 
ServiceConfiguration();
                     svcConfig.setBrokerShutdownTimeoutMs(0L);
                     
svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
                     svcConfig.setClusterName("pulsar-cluster");
@@ -225,9 +230,9 @@ public class TestPulsarService extends PulsarService {
                         brokerServiceFunction(pulsarService -> {
                             try {
                                 if (!super.useSpies) {
-                                    return new BrokerService(pulsarService, 
super.eventLoopGroup);
+                                    return new 
TestBrokerService(pulsarService, super.eventLoopGroup);
                                 } else {
-                                    return 
spyWithClassAndConstructorArgs(BrokerService.class, pulsarService,
+                                    return 
spyWithClassAndConstructorArgs(TestBrokerService.class, pulsarService,
                                             super.eventLoopGroup);
                                 }
                             } catch (Exception e) {
@@ -312,6 +317,18 @@ public class TestPulsarService extends PulsarService {
         }
     }
 
+    private static class TestBrokerService extends BrokerService {
+
+        TestBrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) 
throws Exception {
+            super(pulsar, eventLoopGroup);
+        }
+
+        @Override
+        protected CompletableFuture<Map<String, String>> 
fetchTopicPropertiesAsync(TopicName topicName) {
+            return CompletableFuture.completedFuture(Collections.emptyMap());
+        }
+    }
+
     private final MetadataStoreExtended localMetadataStore;
     private final MetadataStoreExtended configurationMetadataStore;
     private final PulsarResources pulsarResources;
@@ -326,6 +343,8 @@ public class TestPulsarService extends PulsarService {
 
     private final PulsarClient pulsarClient;
 
+    private final NamespaceService namespaceService;
+
     protected TestPulsarService(ServiceConfiguration config, 
MetadataStoreExtended localMetadataStore,
                                 MetadataStoreExtended 
configurationMetadataStore, PulsarResources pulsarResources,
                                 ManagedLedgerStorage 
managedLedgerClientFactory,
@@ -341,6 +360,18 @@ public class TestPulsarService extends PulsarService {
         this.compactor = compactor;
         this.schemaRegistryService = 
spyWithClassAndConstructorArgs(DefaultSchemaRegistryService.class);
         this.pulsarClient = mock(PulsarClientImpl.class);
+        this.namespaceService = mock(NamespaceService.class);
+        try {
+            startNamespaceService();
+        } catch (PulsarServerException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+
+    @Override
+    public Supplier<NamespaceService> getNamespaceServiceProvider() throws 
PulsarServerException {
+        return () -> namespaceService;
     }
 
     @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleBrokerStartTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleBrokerStartTest.java
index 7bc0d564f56..6de2eb90f12 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleBrokerStartTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleBrokerStartTest.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.broker.loadbalance;
 
-import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import java.util.Optional;
@@ -44,7 +43,7 @@ public class SimpleBrokerStartTest {
         LocalBookkeeperEnsemble bkEnsemble = new LocalBookkeeperEnsemble(3, 0, 
() -> 0);
         bkEnsemble.start();
         // Start broker
-        ServiceConfiguration config = spy(ServiceConfiguration.class);
+        ServiceConfiguration config = new ServiceConfiguration();
         config.setClusterName("use");
         config.setWebServicePort(Optional.of(0));
         config.setMetadataStoreUrl("zk:127.0.0.1:" + 
bkEnsemble.getZookeeperPort());
@@ -72,7 +71,7 @@ public class SimpleBrokerStartTest {
         LocalBookkeeperEnsemble bkEnsemble = new LocalBookkeeperEnsemble(3, 0, 
() -> 0);
         bkEnsemble.start();
         // Start broker
-        ServiceConfiguration config = spy(ServiceConfiguration.class);
+        ServiceConfiguration config = new ServiceConfiguration();
         config.setClusterName("use");
         config.setWebServicePort(Optional.of(0));
         config.setMetadataStoreUrl("zk:127.0.0.1:" + 
bkEnsemble.getZookeeperPort());
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
index b267e5e1856..c4898786e3e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.broker.loadbalance;
 import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgsRecordingInvocations;
 import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.testng.Assert.assertEquals;
@@ -105,7 +104,7 @@ public class SimpleLoadManagerImplTest {
         bkEnsemble.start();
 
         // Start broker 1
-        ServiceConfiguration config1 = spy(ServiceConfiguration.class);
+        ServiceConfiguration config1 = new ServiceConfiguration();
         config1.setClusterName("use");
         config1.setWebServicePort(Optional.of(0));
         config1.setMetadataStoreUrl("zk:127.0.0.1:" + 
bkEnsemble.getZookeeperPort());
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java
index 77541fb5cb4..6a6065bc289 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java
@@ -78,7 +78,7 @@ public class HttpTopicLookupv2Test {
         pulsar = mock(PulsarService.class);
         ns = mock(NamespaceService.class);
         auth = mock(AuthorizationService.class);
-        config = spy(ServiceConfiguration.class);
+        config = new ServiceConfiguration();
         config.setClusterName("use");
         clusters = new TreeSet<>();
         clusters.add("use");
@@ -121,7 +121,7 @@ public class HttpTopicLookupv2Test {
         uriField.set(destLookup, uriInfo);
         URI uri = 
URI.create("http://localhost:8080/lookup/v2/destination/topic/myprop/usc/ns2/topic1";);
         doReturn(uri).when(uriInfo).getRequestUri();
-        doReturn(true).when(config).isAuthorizationEnabled();
+        config.setAuthorizationEnabled(true);
 
         AsyncResponse asyncResponse = mock(AsyncResponse.class);
         destLookup.lookupTopicAsync(asyncResponse, 
TopicDomain.persistent.value(), "myprop", "usc", "ns2", "topic1", false, null, 
null);
@@ -146,7 +146,7 @@ public class HttpTopicLookupv2Test {
         uriField.set(destLookup, uriInfo);
         URI uri = 
URI.create("http://localhost:8080/lookup/v2/destination/topic/myprop/usc/ns2/topic1";);
         doReturn(uri).when(uriInfo).getRequestUri();
-        doReturn(true).when(config).isAuthorizationEnabled();
+        config.setAuthorizationEnabled(true);
 
         NamespaceService namespaceService = pulsar.getNamespaceService();
         CompletableFuture<Boolean> future = new CompletableFuture<>();
@@ -173,7 +173,7 @@ public class HttpTopicLookupv2Test {
             return CompletableFuture.completedFuture(null);
         }
     }
-    
+
     @Test
     public void testNotEnoughLookupPermits() throws Exception {
 
@@ -190,7 +190,7 @@ public class HttpTopicLookupv2Test {
         uriField.set(destLookup, uriInfo);
         URI uri = 
URI.create("http://localhost:8080/lookup/v2/destination/topic/myprop/usc/ns2/topic1";);
         doReturn(uri).when(uriInfo).getRequestUri();
-        doReturn(true).when(config).isAuthorizationEnabled();
+        config.setAuthorizationEnabled(true);
 
         AsyncResponse asyncResponse1 = mock(AsyncResponse.class);
         destLookup.lookupTopicAsync(asyncResponse1, 
TopicDomain.persistent.value(), "myprop", "usc", "ns2", "topic1", false,null, 
null);
@@ -220,7 +220,7 @@ public class HttpTopicLookupv2Test {
         uriField.setAccessible(true);
         UriInfo uriInfo = mock(UriInfo.class);
         uriField.set(destLookup, uriInfo);
-        doReturn(false).when(config).isAuthorizationEnabled();
+        config.setAuthorizationEnabled(false);
         AsyncResponse asyncResponse = mock(AsyncResponse.class);
         ArgumentCaptor<RestException> arg = 
ArgumentCaptor.forClass(RestException.class);
 //        // Test policy not found
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
index 38a1252e2ee..9e3d9e3a413 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
@@ -82,7 +82,7 @@ public class OwnershipCacheTest {
         final int port = 8080;
         selfBrokerUrl = "tcp://localhost:" + port;
         pulsar = mock(PulsarService.class);
-        config = mock(ServiceConfiguration.class);
+        config = new ServiceConfiguration();
         executor = 
OrderedScheduler.newSchedulerBuilder().numThreads(1).name("test").build();
         zookeeperServer = new ZookeeperServerTest(0);
         zookeeperServer.start();
@@ -106,8 +106,8 @@ public class OwnershipCacheTest {
 
         doReturn(config).when(pulsar).getConfiguration();
         doReturn(nsService).when(pulsar).getNamespaceService();
-        doReturn(Optional.of(port)).when(config).getBrokerServicePort();
-        doReturn(Optional.empty()).when(config).getWebServicePort();
+        config.setBrokerServicePort(Optional.of(port));
+        config.setWebServicePort(Optional.empty());
         doReturn(brokerService).when(pulsar).getBrokerService();
         doReturn(selfBrokerUrl).when(pulsar).getBrokerServiceUrl();
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index 2bebac14023..4dfaade33ca 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -98,7 +98,7 @@ public class PersistentDispatcherFailoverConsumerTest {
 
     @BeforeMethod
     public void setup() throws Exception {
-        ServiceConfiguration svcConfig = spy(ServiceConfiguration.class);
+        ServiceConfiguration svcConfig = new ServiceConfiguration();
         svcConfig.setBrokerShutdownTimeoutMs(0L);
         svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
         svcConfig.setClusterName("pulsar-cluster");
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
index 4bf0481e0d0..6ce6de1916f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
@@ -22,7 +22,6 @@ import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructor
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertFalse;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
@@ -78,7 +77,7 @@ public class PersistentTopicConcurrentTest extends 
MockedBookKeeperTestCase {
     @BeforeMethod
     public void setup(Method m) throws Exception {
         super.setUp(m);
-        ServiceConfiguration svcConfig = spy(ServiceConfiguration.class);
+        ServiceConfiguration svcConfig = new ServiceConfiguration();
         svcConfig.setBrokerShutdownTimeoutMs(0L);
         svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
         @Cleanup
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 665ac51446b..c4088d2e124 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -16,23 +16,20 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.pulsar.broker.service;
 
 import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
-import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper;
-import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyObject;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.CALLS_REAL_METHODS;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.matches;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -44,14 +41,11 @@ import static org.testng.Assert.assertTrue;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandler;
-import io.netty.channel.EventLoopGroup;
 import io.netty.channel.embedded.EmbeddedChannel;
-import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.nio.charset.StandardCharsets;
-import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -65,7 +59,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
-import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
@@ -75,27 +68,22 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
-import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.TestPulsarService;
 import org.apache.pulsar.broker.TransactionMetadataStoreService;
 import org.apache.pulsar.broker.auth.MockAuthenticationProvider;
 import org.apache.pulsar.broker.auth.MockMultiStageAuthenticationProvider;
-import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.authentication.AuthenticationProvider;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.authentication.AuthenticationState;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
-import org.apache.pulsar.broker.intercept.BrokerInterceptor;
 import org.apache.pulsar.broker.namespace.NamespaceService;
-import org.apache.pulsar.broker.resources.NamespaceResources;
-import org.apache.pulsar.broker.resources.PulsarResources;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
 import org.apache.pulsar.broker.service.ServerCnx.State;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
-import org.apache.pulsar.broker.service.schema.DefaultSchemaRegistryService;
 import org.apache.pulsar.broker.service.utils.ClientChannelHelper;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.api.AuthData;
@@ -138,17 +126,11 @@ import 
org.apache.pulsar.common.protocol.Commands.ChecksumType;
 import org.apache.pulsar.common.protocol.PulsarHandler;
 import org.apache.pulsar.common.topics.TopicList;
 import org.apache.pulsar.common.util.FutureUtil;
-import org.apache.pulsar.common.util.GracefulExecutorServicesShutdown;
 import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
-import org.apache.pulsar.common.util.netty.EventLoopUtil;
-import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
-import org.apache.pulsar.metadata.impl.ZKMetadataStore;
-import org.apache.zookeeper.ZooKeeper;
 import org.awaitility.Awaitility;
 import org.mockito.ArgumentCaptor;
 import org.mockito.MockedStatic;
 import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -160,12 +142,12 @@ public class ServerCnxTest {
     protected EmbeddedChannel channel;
     private ServiceConfiguration svcConfig;
     private ServerCnx serverCnx;
+
+    protected TestPulsarService.Factory testPulsarServiceFactory;
+
+    protected PulsarService pulsar;
     protected BrokerService brokerService;
-    private ManagedLedgerFactory mlFactoryMock;
     private ClientChannelHelper clientChannelHelper;
-    private PulsarService pulsar;
-    private MetadataStoreExtended store;
-    private NamespaceResources namespaceResources;
     protected NamespaceService namespaceService;
     private final int currentProtocolVersion = 
ProtocolVersion.values()[ProtocolVersion.values().length - 1]
             .getValue();
@@ -175,7 +157,8 @@ public class ServerCnxTest {
     private final String nonOwnedTopicName = 
"persistent://prop/use/ns-abc/success-not-owned-topic";
     private final String encryptionRequiredTopicName = 
"persistent://prop/use/ns-abc/successEncryptionRequiredTopic";
     private final String successSubName = "successSub";
-    private final String nonExistentTopicName = 
"persistent://nonexistent-prop/nonexistent-cluster/nonexistent-namespace/successNonExistentTopic";
+    private final String nonExistentTopicName =
+            
"persistent://nonexistent-prop/nonexistent-cluster/nonexistent-namespace/successNonExistentTopic";
     private final String topicWithNonLocalCluster = 
"persistent://prop/usw/ns-abc/successTopic";
     private final List<String> matchingTopics = Arrays.asList(
             "persistent://use/ns-abc/topic-1",
@@ -188,55 +171,26 @@ public class ServerCnxTest {
 
     private ManagedLedger ledgerMock;
     private ManagedCursor cursorMock;
-    private OrderedExecutor executor;
-    private EventLoopGroup eventLoopGroup;
+
 
     @BeforeMethod(alwaysRun = true)
     public void setup() throws Exception {
-        eventLoopGroup = new NioEventLoopGroup();
-        executor = OrderedExecutor.newBuilder().numThreads(1).build();
-        svcConfig = spy(ServiceConfiguration.class);
+        svcConfig = new ServiceConfiguration();
         svcConfig.setBrokerShutdownTimeoutMs(0L);
         svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
-        svcConfig.setClusterName("pulsar-cluster");
-        pulsar = spyWithClassAndConstructorArgs(PulsarService.class, 
svcConfig);
-        doReturn(new 
DefaultSchemaRegistryService()).when(pulsar).getSchemaRegistryService();
-
         svcConfig.setKeepAliveIntervalSeconds(inSec(1, TimeUnit.SECONDS));
         svcConfig.setBacklogQuotaCheckEnabled(false);
-        doReturn(svcConfig).when(pulsar).getConfiguration();
-        
doReturn(mock(PulsarResources.class)).when(pulsar).getPulsarResources();
-
-        doReturn("use").when(svcConfig).getClusterName();
-
-        mlFactoryMock = mock(ManagedLedgerFactory.class);
-        doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory();
-
-        ZooKeeper mockZk = createMockZooKeeper();
-        doReturn(createMockBookKeeper(executor))
-            .when(pulsar).getBookKeeperClient();
-
-        store = new ZKMetadataStore(mockZk);
-
-        doReturn(store).when(pulsar).getLocalMetadataStore();
-        doReturn(store).when(pulsar).getConfigurationMetadataStore();
+        svcConfig.setClusterName("use");
+        testPulsarServiceFactory = TestPulsarService.Factory.builder()
+                .config(svcConfig)
+                .useSpies(true)
+                .build();
+        pulsar = testPulsarServiceFactory.getPulsarService();
 
-        brokerService = spyWithClassAndConstructorArgs(BrokerService.class, 
pulsar, eventLoopGroup);
-        BrokerInterceptor interceptor = mock(BrokerInterceptor.class);
-        doReturn(interceptor).when(brokerService).getInterceptor();
-        doReturn(brokerService).when(pulsar).getBrokerService();
-        
doReturn(CompletableFuture.completedFuture(Collections.emptyMap())).when(brokerService).fetchTopicPropertiesAsync(anyObject());
+        brokerService = testPulsarServiceFactory.getBrokerService();
 
-        doReturn(executor).when(pulsar).getOrderedExecutor();
-
-        PulsarResources pulsarResources = 
spyWithClassAndConstructorArgs(PulsarResources.class, store, store);
-        namespaceResources = 
spyWithClassAndConstructorArgs(NamespaceResources.class, store, 30);
-        
doReturn(namespaceResources).when(pulsarResources).getNamespaceResources();
-        doReturn(pulsarResources).when(pulsar).getPulsarResources();
-
-        namespaceService = mock(NamespaceService.class);
+        namespaceService = pulsar.getNamespaceService();
         
doReturn(CompletableFuture.completedFuture(null)).when(namespaceService).getBundleAsync(any());
-        doReturn(namespaceService).when(pulsar).getNamespaceService();
         doReturn(true).when(namespaceService).isServiceUnitOwned(any());
         doReturn(true).when(namespaceService).isServiceUnitActive(any());
         
doReturn(CompletableFuture.completedFuture(true)).when(namespaceService).isServiceUnitActiveAsync(any());
@@ -263,14 +217,10 @@ public class ServerCnxTest {
         if (channel != null) {
             channel.close();
         }
-        brokerService.close();
-        pulsar.close();
-        GracefulExecutorServicesShutdown.initiate()
-                .timeout(Duration.ZERO)
-                .shutdown(executor)
-                .handle().get();
-        EventLoopUtil.shutdownGracefully(eventLoopGroup).get();
-        store.close();
+        if (testPulsarServiceFactory != null) {
+            testPulsarServiceFactory.close();
+            testPulsarServiceFactory = null;
+        }
     }
 
     @Test(timeOut = 30000)
@@ -366,7 +316,8 @@ public class ServerCnxTest {
         assertEquals(serverCnx.getState(), State.Start);
 
         // test server response to CONNECT
-        ByteBuf clientCommand = Commands.newConnect("none", "", 
ProtocolVersion.v0.getValue(), null, null, null, null, null);
+        ByteBuf clientCommand =
+                Commands.newConnect("none", "", ProtocolVersion.v0.getValue(), 
null, null, null, null, null);
         channel.writeInbound(clientCommand);
 
         assertEquals(serverCnx.getState(), State.Connected);
@@ -408,22 +359,21 @@ public class ServerCnxTest {
         AuthenticationService authenticationService = 
mock(AuthenticationService.class);
         AuthenticationProvider authenticationProvider = 
mock(AuthenticationProvider.class);
         AuthenticationState authenticationState = 
mock(AuthenticationState.class);
-        AuthenticationDataSource authenticationDataSource = 
mock(AuthenticationDataSource.class);
         AuthData authData = AuthData.of(null);
 
         
doReturn(authenticationService).when(brokerService).getAuthenticationService();
         
doReturn(authenticationProvider).when(authenticationService).getAuthenticationProvider(Mockito.anyString());
         doReturn(authenticationState).when(authenticationProvider)
-            .newAuthState(Mockito.any(), Mockito.any(), Mockito.any());
+                .newAuthState(Mockito.any(), Mockito.any(), Mockito.any());
         doReturn(authData).when(authenticationState)
-            .authenticate(authData);
+                .authenticate(authData);
         doReturn(true).when(authenticationState)
-            .isComplete();
+                .isComplete();
 
         doReturn("appid1").when(authenticationState)
-            .getAuthRole();
+                .getAuthRole();
 
-        doReturn(true).when(brokerService).isAuthenticationEnabled();
+        svcConfig.setAuthenticationEnabled(true);
 
         resetChannel();
         assertTrue(channel.isActive());
@@ -443,7 +393,7 @@ public class ServerCnxTest {
         AuthenticationService authenticationService = 
mock(AuthenticationService.class);
         
doReturn(authenticationService).when(brokerService).getAuthenticationService();
         
doReturn(Optional.empty()).when(authenticationService).getAnonymousUserRole();
-        doReturn(true).when(brokerService).isAuthenticationEnabled();
+        svcConfig.setAuthenticationEnabled(true);
 
         resetChannel();
         assertTrue(channel.isActive());
@@ -474,7 +424,7 @@ public class ServerCnxTest {
         assertTrue(channel.isActive());
         assertEquals(serverCnx.getState(), State.Start);
 
-        ByteBuf clientCommand = Commands.newConnect(authMethodName, 
"pass.proxy", 1,null,
+        ByteBuf clientCommand = Commands.newConnect(authMethodName, 
"pass.proxy", 1, null,
                 null, "client", "fail", authMethodName);
         channel.writeInbound(clientCommand);
 
@@ -513,7 +463,8 @@ public class ServerCnxTest {
         assertTrue(challenge1 instanceof CommandAuthChallenge);
 
         // Trigger another AuthChallenge to verify that code path continues to 
challenge
-        ByteBuf authResponse1 = Commands.newAuthResponse(authMethodName, 
AuthData.of("challenge.client".getBytes()), 1, "1");
+        ByteBuf authResponse1 =
+                Commands.newAuthResponse(authMethodName, 
AuthData.of("challenge.client".getBytes()), 1, "1");
         channel.writeInbound(authResponse1);
 
         Object challenge2 = getResponse();
@@ -611,10 +562,11 @@ public class ServerCnxTest {
     @Test(timeOut = 30000)
     public void testProducerCommandWithAuthorizationPositive() throws 
Exception {
         AuthorizationService authorizationService = 
mock(AuthorizationService.class);
-        
doReturn(CompletableFuture.completedFuture(true)).when(authorizationService).allowTopicOperationAsync(Mockito.any(),
-                Mockito.any(), Mockito.any(), Mockito.any());
+        
doReturn(CompletableFuture.completedFuture(true)).when(authorizationService)
+                .allowTopicOperationAsync(Mockito.any(),
+                        Mockito.any(), Mockito.any(), Mockito.any());
         
doReturn(authorizationService).when(brokerService).getAuthorizationService();
-        doReturn(true).when(brokerService).isAuthenticationEnabled();
+        svcConfig.setAuthenticationEnabled(true);
         resetChannel();
         setChannelConnected();
 
@@ -635,16 +587,19 @@ public class ServerCnxTest {
 
     @Test(timeOut = 30000)
     public void testNonExistentTopic() throws Exception {
-        AuthorizationService authorizationService = 
spyWithClassAndConstructorArgs(AuthorizationService.class, svcConfig, 
pulsar.getPulsarResources());
+        AuthorizationService authorizationService =
+                spyWithClassAndConstructorArgs(AuthorizationService.class, 
svcConfig, pulsar.getPulsarResources());
         
doReturn(authorizationService).when(brokerService).getAuthorizationService();
-        doReturn(true).when(brokerService).isAuthorizationEnabled();
+        svcConfig.setAuthorizationEnabled(true);
         svcConfig.setAuthorizationEnabled(true);
         Field providerField = 
AuthorizationService.class.getDeclaredField("provider");
         providerField.setAccessible(true);
-        PulsarAuthorizationProvider authorizationProvider = 
spyWithClassAndConstructorArgs(PulsarAuthorizationProvider.class, svcConfig,
-                pulsar.getPulsarResources());
+        PulsarAuthorizationProvider authorizationProvider =
+                
spyWithClassAndConstructorArgs(PulsarAuthorizationProvider.class, svcConfig,
+                        pulsar.getPulsarResources());
         providerField.set(authorizationService, authorizationProvider);
-        
doReturn(CompletableFuture.completedFuture(false)).when(authorizationProvider).isSuperUser(Mockito.anyString(),
 Mockito.any(), Mockito.any());
+        
doReturn(CompletableFuture.completedFuture(false)).when(authorizationProvider)
+                .isSuperUser(Mockito.anyString(), Mockito.any(), 
Mockito.any());
 
         // Test producer creation
         resetChannel();
@@ -659,7 +614,8 @@ public class ServerCnxTest {
         resetChannel();
         setChannelConnected();
         ByteBuf newSubscribeCmd = Commands.newSubscribe(nonExistentTopicName, 
//
-                successSubName, 1 /* consumer id */, 1 /* request id */, 
SubType.Exclusive, 0, "test" /* consumer name */, 0);
+                successSubName, 1 /* consumer id */, 1 /* request id */, 
SubType.Exclusive, 0,
+                "test" /* consumer name */, 0);
         channel.writeInbound(newSubscribeCmd);
         assertTrue(getResponse() instanceof CommandError);
         channel.finish();
@@ -668,18 +624,23 @@ public class ServerCnxTest {
     @Test(timeOut = 30000)
     public void testClusterAccess() throws Exception {
         svcConfig.setAuthorizationEnabled(true);
-        AuthorizationService authorizationService = 
spyWithClassAndConstructorArgs(AuthorizationService.class, svcConfig, 
pulsar.getPulsarResources());
+        AuthorizationService authorizationService =
+                spyWithClassAndConstructorArgs(AuthorizationService.class, 
svcConfig, pulsar.getPulsarResources());
         Field providerField = 
AuthorizationService.class.getDeclaredField("provider");
         providerField.setAccessible(true);
-        PulsarAuthorizationProvider authorizationProvider = 
spyWithClassAndConstructorArgs(PulsarAuthorizationProvider.class, svcConfig,
-                pulsar.getPulsarResources());
+        PulsarAuthorizationProvider authorizationProvider =
+                
spyWithClassAndConstructorArgs(PulsarAuthorizationProvider.class, svcConfig,
+                        pulsar.getPulsarResources());
         providerField.set(authorizationService, authorizationProvider);
         
doReturn(authorizationService).when(brokerService).getAuthorizationService();
-        doReturn(true).when(brokerService).isAuthorizationEnabled();
-        
doReturn(CompletableFuture.completedFuture(false)).when(authorizationProvider).isSuperUser(Mockito.anyString(),
 Mockito.any(), Mockito.any());
-        
doReturn(CompletableFuture.completedFuture(false)).when(authorizationProvider).validateTenantAdminAccess(Mockito.anyString(),
 Mockito.any(), Mockito.any());
-        
doReturn(CompletableFuture.completedFuture(true)).when(authorizationProvider).checkPermission(any(TopicName.class),
 Mockito.anyString(),
-                any(AuthAction.class));
+        svcConfig.setAuthorizationEnabled(true);
+        
doReturn(CompletableFuture.completedFuture(false)).when(authorizationProvider)
+                .isSuperUser(Mockito.anyString(), Mockito.any(), 
Mockito.any());
+        
doReturn(CompletableFuture.completedFuture(false)).when(authorizationProvider)
+                .validateTenantAdminAccess(Mockito.anyString(), Mockito.any(), 
Mockito.any());
+        
doReturn(CompletableFuture.completedFuture(true)).when(authorizationProvider)
+                .checkPermission(any(TopicName.class), Mockito.anyString(),
+                        any(AuthAction.class));
 
         resetChannel();
         setChannelConnected();
@@ -699,14 +660,18 @@ public class ServerCnxTest {
 
     @Test(timeOut = 30000)
     public void testNonExistentTopicSuperUserAccess() throws Exception {
-        AuthorizationService authorizationService = 
spyWithClassAndConstructorArgs(AuthorizationService.class, svcConfig, 
pulsar.getPulsarResources());
+        AuthorizationService authorizationService =
+                spyWithClassAndConstructorArgs(AuthorizationService.class, 
svcConfig, pulsar.getPulsarResources());
         
doReturn(authorizationService).when(brokerService).getAuthorizationService();
-        doReturn(true).when(brokerService).isAuthorizationEnabled();
+        svcConfig.setAuthorizationEnabled(true);
         Field providerField = 
AuthorizationService.class.getDeclaredField("provider");
         providerField.setAccessible(true);
-        PulsarAuthorizationProvider authorizationProvider = 
spyWithClassAndConstructorArgs(PulsarAuthorizationProvider.class, svcConfig, 
pulsar.getPulsarResources());
+        PulsarAuthorizationProvider authorizationProvider =
+                
spyWithClassAndConstructorArgs(PulsarAuthorizationProvider.class, svcConfig,
+                        pulsar.getPulsarResources());
         providerField.set(authorizationService, authorizationProvider);
-        
doReturn(CompletableFuture.completedFuture(true)).when(authorizationProvider).isSuperUser(Mockito.anyString(),
 Mockito.any(), Mockito.any());
+        
doReturn(CompletableFuture.completedFuture(true)).when(authorizationProvider)
+                .isSuperUser(Mockito.anyString(), Mockito.any(), 
Mockito.any());
 
         // Test producer creation
         resetChannel();
@@ -739,11 +704,12 @@ public class ServerCnxTest {
     @Test(timeOut = 30000)
     public void testProducerCommandWithAuthorizationNegative() throws 
Exception {
         AuthorizationService authorizationService = 
mock(AuthorizationService.class);
-        
doReturn(CompletableFuture.completedFuture(false)).when(authorizationService).allowTopicOperationAsync(Mockito.any(),
-                Mockito.any(), Mockito.any(), Mockito.any());
+        
doReturn(CompletableFuture.completedFuture(false)).when(authorizationService)
+                .allowTopicOperationAsync(Mockito.any(),
+                        Mockito.any(), Mockito.any(), Mockito.any());
         
doReturn(authorizationService).when(brokerService).getAuthorizationService();
-        doReturn(true).when(brokerService).isAuthenticationEnabled();
-        doReturn(true).when(brokerService).isAuthorizationEnabled();
+        svcConfig.setAuthenticationEnabled(true);
+        svcConfig.setAuthorizationEnabled(true);
         doReturn("prod1").when(brokerService).generateUniqueProducerName();
         resetChannel();
         setChannelConnected();
@@ -925,12 +891,13 @@ public class ServerCnxTest {
         // Delay the topic creation in a deterministic way
         CompletableFuture<Runnable> openTopicFuture = new 
CompletableFuture<>();
         doAnswer(invocationOnMock -> {
-            openTopicFuture.complete(() -> {
-                ((OpenLedgerCallback) 
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
-            });
+            openTopicFuture.complete(
+                    () -> ((OpenLedgerCallback) 
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock,
+                            null));
             return null;
-        }).when(mlFactoryMock).asyncOpen(matches(".*success.*"), 
any(ManagedLedgerConfig.class),
-                any(OpenLedgerCallback.class), any(Supplier.class), any());
+        }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+                .asyncOpen(matches(".*success.*"), 
any(ManagedLedgerConfig.class),
+                        any(OpenLedgerCallback.class), any(Supplier.class), 
any());
 
         // In a create producer timeout from client side we expect to see this 
sequence of commands :
         // 1. create producer
@@ -946,7 +913,7 @@ public class ServerCnxTest {
                 producerName, Collections.emptyMap(), false);
         channel.writeInbound(createProducer1);
 
-        ByteBuf closeProducer = Commands.newCloseProducer(1 /* producer id */, 
2 /* request id */ );
+        ByteBuf closeProducer = Commands.newCloseProducer(1 /* producer id */, 
2 /* request id */);
         channel.writeInbound(closeProducer);
 
         ByteBuf createProducer2 = Commands.newProducer(successTopicName, 1 /* 
producer id */, 3 /* request id */,
@@ -979,12 +946,13 @@ public class ServerCnxTest {
         // Delay the topic creation in a deterministic way
         CompletableFuture<Runnable> openTopicFuture = new 
CompletableFuture<>();
         doAnswer(invocationOnMock -> {
-            openTopicFuture.complete(() -> {
-                ((OpenLedgerCallback) 
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
-            });
+            openTopicFuture.complete(
+                    () -> ((OpenLedgerCallback) 
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock,
+                            null));
             return null;
-        }).when(mlFactoryMock).asyncOpen(matches(".*success.*"), 
any(ManagedLedgerConfig.class),
-                any(OpenLedgerCallback.class), any(Supplier.class), any());
+        }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+                .asyncOpen(matches(".*success.*"), 
any(ManagedLedgerConfig.class),
+                        any(OpenLedgerCallback.class), any(Supplier.class), 
any());
 
         // In a create producer timeout from client side we expect to see this 
sequence of commands :
         // 1. create producer
@@ -1002,7 +970,7 @@ public class ServerCnxTest {
                 producerName, Collections.emptyMap(), false);
         channel.writeInbound(createProducer1);
 
-        ByteBuf closeProducer = Commands.newCloseProducer(1 /* producer id */, 
2 /* request id */ );
+        ByteBuf closeProducer = Commands.newCloseProducer(1 /* producer id */, 
2 /* request id */);
         channel.writeInbound(closeProducer);
 
         ByteBuf createProducer2 = Commands.newProducer(successTopicName, 1 /* 
producer id */, 3 /* request id */,
@@ -1044,16 +1012,14 @@ public class ServerCnxTest {
 
         // Delay the topic creation in a deterministic way
         CountDownLatch topicCreationDelayLatch = new CountDownLatch(1);
-        doAnswer(new Answer<Object>() {
-            @Override
-            public Object answer(InvocationOnMock invocationOnMock) throws 
Throwable {
-                topicCreationDelayLatch.await();
+        doAnswer((Answer<Object>) invocationOnMock -> {
+            topicCreationDelayLatch.await();
 
-                ((OpenLedgerCallback) 
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
-                return null;
-            }
-        }).when(mlFactoryMock).asyncOpen(matches(".*success.*"), 
any(ManagedLedgerConfig.class),
-                any(OpenLedgerCallback.class), any(Supplier.class), any());
+            ((OpenLedgerCallback) 
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
+            return null;
+        }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+                .asyncOpen(matches(".*success.*"), 
any(ManagedLedgerConfig.class),
+                        any(OpenLedgerCallback.class), any(Supplier.class), 
any());
 
         // In a create producer timeout from client side we expect to see this 
sequence of commands :
         // 1. create producer
@@ -1069,7 +1035,7 @@ public class ServerCnxTest {
                 producerName, Collections.emptyMap(), false);
         channel.writeInbound(createProducer1);
 
-        ByteBuf closeProducer1 = Commands.newCloseProducer(1 /* producer id 
*/, 2 /* request id */ );
+        ByteBuf closeProducer1 = Commands.newCloseProducer(1 /* producer id 
*/, 2 /* request id */);
         channel.writeInbound(closeProducer1);
 
         ByteBuf createProducer2 = Commands.newProducer(successTopicName, 1 /* 
producer id */, 3 /* request id */,
@@ -1126,12 +1092,13 @@ public class ServerCnxTest {
         // Delay the topic creation in a deterministic way
         CompletableFuture<Runnable> openFailedTopic = new 
CompletableFuture<>();
         doAnswer(invocationOnMock -> {
-            openFailedTopic.complete(() -> {
-                ((OpenLedgerCallback) 
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
-            });
+            openFailedTopic.complete(
+                    () -> ((OpenLedgerCallback) 
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock,
+                            null));
             return null;
-        }).when(mlFactoryMock).asyncOpen(matches(".*fail.*"), 
any(ManagedLedgerConfig.class),
-                any(OpenLedgerCallback.class), any(Supplier.class), any());
+        }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+                .asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class),
+                        any(OpenLedgerCallback.class), any(Supplier.class), 
any());
 
         // In a create producer timeout from client side we expect to see this 
sequence of commands :
         // 1. create a failure producer which will timeout creation after 
100msec
@@ -1148,7 +1115,7 @@ public class ServerCnxTest {
                 producerName, Collections.emptyMap(), false);
         channel.writeInbound(createProducer1);
 
-        ByteBuf closeProducer = Commands.newCloseProducer(1 /* producer id */, 
2 /* request id */ );
+        ByteBuf closeProducer = Commands.newCloseProducer(1 /* producer id */, 
2 /* request id */);
         channel.writeInbound(closeProducer);
 
         ByteBuf createProducer2 = Commands.newProducer(successTopicName, 1 /* 
producer id */, 3 /* request id */,
@@ -1196,13 +1163,14 @@ public class ServerCnxTest {
         // Delay the topic creation in a deterministic way
         CompletableFuture<Runnable> openTopicTask = new CompletableFuture<>();
         doAnswer(invocationOnMock -> {
-            openTopicTask.complete(() -> {
-                ((OpenLedgerCallback) 
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
-            });
+            openTopicTask.complete(
+                    () -> ((OpenLedgerCallback) 
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock,
+                            null));
 
             return null;
-        }).when(mlFactoryMock).asyncOpen(matches(".*success.*"), 
any(ManagedLedgerConfig.class),
-                any(OpenLedgerCallback.class), any(Supplier.class), any());
+        }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+                .asyncOpen(matches(".*success.*"), 
any(ManagedLedgerConfig.class),
+                        any(OpenLedgerCallback.class), any(Supplier.class), 
any());
 
         // In a subscribe timeout from client side we expect to see this 
sequence of commands :
         // 1. Subscribe
@@ -1271,22 +1239,22 @@ public class ServerCnxTest {
         // Delay the topic creation in a deterministic way
         CompletableFuture<Runnable> openTopicSuccess = new 
CompletableFuture<>();
         doAnswer(invocationOnMock -> {
-            openTopicSuccess.complete(() -> {
-                ((OpenLedgerCallback) 
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
-            });
+            openTopicSuccess.complete(
+                    () -> ((OpenLedgerCallback) 
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock,
+                            null));
             return null;
-        }).when(mlFactoryMock).asyncOpen(matches(".*success.*"), 
any(ManagedLedgerConfig.class),
-                any(OpenLedgerCallback.class), any(Supplier.class), any());
+        }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+                .asyncOpen(matches(".*success.*"), 
any(ManagedLedgerConfig.class),
+                        any(OpenLedgerCallback.class), any(Supplier.class), 
any());
 
         CompletableFuture<Runnable> openTopicFail = new CompletableFuture<>();
         doAnswer(invocationOnMock -> {
-            openTopicFail.complete(() -> {
-                ((OpenLedgerCallback) invocationOnMock.getArguments()[2])
-                        .openLedgerFailed(new ManagedLedgerException("Managed 
ledger failure"), null);
-            });
+            openTopicFail.complete(() -> ((OpenLedgerCallback) 
invocationOnMock.getArguments()[2])
+                    .openLedgerFailed(new ManagedLedgerException("Managed 
ledger failure"), null));
             return null;
-        }).when(mlFactoryMock).asyncOpen(matches(".*fail.*"), 
any(ManagedLedgerConfig.class),
-                any(OpenLedgerCallback.class), any(Supplier.class), any());
+        }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+                .asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class),
+                        any(OpenLedgerCallback.class), any(Supplier.class), 
any());
 
         // In a subscribe timeout from client side we expect to see this 
sequence of commands :
         // 1. Subscribe against failtopic which will fail after 100msec
@@ -1301,7 +1269,7 @@ public class ServerCnxTest {
                 "test" /* consumer name */, 0 /* avoid reseting cursor */);
         channel.writeInbound(subscribe1);
 
-        ByteBuf closeConsumer = Commands.newCloseConsumer(1 /* consumer id */, 
2 /* request id */ );
+        ByteBuf closeConsumer = Commands.newCloseConsumer(1 /* consumer id */, 
2 /* request id */);
         channel.writeInbound(closeConsumer);
 
         ByteBuf subscribe2 = Commands.newSubscribe(successTopicName, //
@@ -1352,8 +1320,8 @@ public class ServerCnxTest {
 
         resetChannel();
         setChannelConnected();
-        doReturn(false).when(brokerService).isAuthenticationEnabled();
-        doReturn(false).when(brokerService).isAuthorizationEnabled();
+        svcConfig.setAuthenticationEnabled(false);
+        svcConfig.setAuthorizationEnabled(false);
         // test SUBSCRIBE on topic and cursor creation success
         ByteBuf clientCommand = Commands.newSubscribe(successTopicName, //
                 successSubName, 1 /* consumer id */, 1 /* request id */, 
SubType.Exclusive, 0,
@@ -1392,8 +1360,8 @@ public class ServerCnxTest {
         resetChannel();
         setChannelConnected();
         setConnectionVersion(ProtocolVersion.v3.getValue());
-        doReturn(false).when(brokerService).isAuthenticationEnabled();
-        doReturn(false).when(brokerService).isAuthorizationEnabled();
+        svcConfig.setAuthenticationEnabled(false);
+        svcConfig.setAuthorizationEnabled(false);
         // test SUBSCRIBE on topic and cursor creation success
         ByteBuf clientCommand = Commands.newSubscribe(successTopicName, //
                 successSubName, 1 /* consumer id */, 1 /* request id */, 
SubType.Exclusive, 0 /* priority */,
@@ -1420,11 +1388,12 @@ public class ServerCnxTest {
     @Test(timeOut = 30000)
     public void testSubscribeCommandWithAuthorizationPositive() throws 
Exception {
         AuthorizationService authorizationService = 
mock(AuthorizationService.class);
-        
doReturn(CompletableFuture.completedFuture(true)).when(authorizationService).allowTopicOperationAsync(Mockito.any(),
-                Mockito.any(), Mockito.any(), Mockito.any());
+        
doReturn(CompletableFuture.completedFuture(true)).when(authorizationService)
+                .allowTopicOperationAsync(Mockito.any(),
+                        Mockito.any(), Mockito.any(), Mockito.any());
         
doReturn(authorizationService).when(brokerService).getAuthorizationService();
-        doReturn(true).when(brokerService).isAuthenticationEnabled();
-        doReturn(true).when(brokerService).isAuthorizationEnabled();
+        svcConfig.setAuthenticationEnabled(true);
+        svcConfig.setAuthorizationEnabled(true);
         resetChannel();
         setChannelConnected();
 
@@ -1442,18 +1411,20 @@ public class ServerCnxTest {
     @Test(timeOut = 30000)
     public void testSubscribeCommandWithAuthorizationNegative() throws 
Exception {
         AuthorizationService authorizationService = 
mock(AuthorizationService.class);
-        
doReturn(CompletableFuture.completedFuture(false)).when(authorizationService).allowTopicOperationAsync(Mockito.any(),
-                Mockito.any(), Mockito.any(), Mockito.any());
+        
doReturn(CompletableFuture.completedFuture(false)).when(authorizationService)
+                .allowTopicOperationAsync(Mockito.any(),
+                        Mockito.any(), Mockito.any(), Mockito.any());
         
doReturn(authorizationService).when(brokerService).getAuthorizationService();
-        doReturn(true).when(brokerService).isAuthenticationEnabled();
-        doReturn(true).when(brokerService).isAuthorizationEnabled();
+        svcConfig.setAuthenticationEnabled(true);
+        svcConfig.setAuthorizationEnabled(true);
 
         resetChannel();
         setChannelConnected();
 
         // test SUBSCRIBE on topic and cursor creation success
         ByteBuf clientCommand = Commands.newSubscribe(successTopicName, //
-                successSubName, 1 /* consumer id */, 1 /* request id */, 
SubType.Exclusive, 0, "test" /* consumer name */, 0 /*avoid reseting cursor*/);
+                successSubName, 1 /* consumer id */, 1 /* request id */, 
SubType.Exclusive, 0,
+                "test" /* consumer name */, 0 /*avoid reseting cursor*/);
         channel.writeInbound(clientCommand);
         assertTrue(getResponse() instanceof CommandError);
 
@@ -1467,15 +1438,16 @@ public class ServerCnxTest {
 
         ByteBuf clientCommand = Commands.newSubscribe(successTopicName, 
successSubName, 1 /* consumer id */,
                 1 /*
-                   * request id
-                   */, SubType.Exclusive, 0, "test" /* consumer name */, 0 
/*avoid reseting cursor*/);
+                 * request id
+                 */, SubType.Exclusive, 0, "test" /* consumer name */, 0 
/*avoid reseting cursor*/);
         channel.writeInbound(clientCommand);
         assertTrue(getResponse() instanceof CommandSuccess);
 
         PositionImpl pos = new PositionImpl(0, 0);
 
-        clientCommand = Commands.newAck(1 /* consumer id */, 
pos.getLedgerId(), pos.getEntryId(), null, AckType.Individual,
-                                        null, Collections.emptyMap(), -1);
+        clientCommand =
+                Commands.newAck(1 /* consumer id */, pos.getLedgerId(), 
pos.getEntryId(), null, AckType.Individual,
+                        null, Collections.emptyMap(), -1);
         channel.writeInbound(clientCommand);
 
         // verify nothing is sent out on the wire after ack
@@ -1522,12 +1494,14 @@ public class ServerCnxTest {
         // add `clusterDispatchRate` otherwise there will be a NPE
         // 
`org.apache.pulsar.broker.service.AbstractTopic.updateNamespaceReplicatorDispatchRate`
         policies.replicatorDispatchRate = new HashMap<>();
-        
doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(namespaceResources)
-                
.getPoliciesAsync(TopicName.get(encryptionRequiredTopicName).getNamespaceObject());
+        testPulsarServiceFactory.getPulsarResources().getNamespaceResources()
+                
.createPolicies(TopicName.get(encryptionRequiredTopicName).getNamespaceObject(),
+                        policies);
 
         // test success case: encrypted producer can connect
-        ByteBuf clientCommand = 
Commands.newProducer(encryptionRequiredTopicName, 1 /* producer id */, 1 /* 
request id */,
-                "encrypted-producer", true, Collections.emptyMap(), false);
+        ByteBuf clientCommand =
+                Commands.newProducer(encryptionRequiredTopicName, 1 /* 
producer id */, 1 /* request id */,
+                        "encrypted-producer", true, Collections.emptyMap(), 
false);
         channel.writeInbound(clientCommand);
 
         Object response = getResponse();
@@ -1558,12 +1532,14 @@ public class ServerCnxTest {
         // add `clusterDispatchRate` otherwise there will be a NPE
         // 
`org.apache.pulsar.broker.service.AbstractTopic.updateNamespaceReplicatorDispatchRate`
         policies.replicatorDispatchRate = new HashMap<>();
-        
doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(namespaceResources)
-                
.getPoliciesAsync(TopicName.get(encryptionRequiredTopicName).getNamespaceObject());
+        testPulsarServiceFactory.getPulsarResources().getNamespaceResources()
+                
.createPolicies(TopicName.get(encryptionRequiredTopicName).getNamespaceObject(),
+                        policies);
 
         // test failure case: unencrypted producer cannot connect
-        ByteBuf clientCommand = 
Commands.newProducer(encryptionRequiredTopicName, 2 /* producer id */, 2 /* 
request id */,
-                "unencrypted-producer", false, Collections.emptyMap(), false);
+        ByteBuf clientCommand =
+                Commands.newProducer(encryptionRequiredTopicName, 2 /* 
producer id */, 2 /* request id */,
+                        "unencrypted-producer", false, Collections.emptyMap(), 
false);
         channel.writeInbound(clientCommand);
 
         Object response = getResponse();
@@ -1598,12 +1574,14 @@ public class ServerCnxTest {
         // add `clusterDispatchRate` otherwise there will be a NPE
         // 
`org.apache.pulsar.broker.service.AbstractTopic.updateNamespaceReplicatorDispatchRate`
         policies.replicatorDispatchRate = new HashMap<>();
-        
doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(namespaceResources)
-                
.getPoliciesAsync(TopicName.get(encryptionRequiredTopicName).getNamespaceObject());
+        testPulsarServiceFactory.getPulsarResources().getNamespaceResources()
+                
.createPolicies(TopicName.get(encryptionRequiredTopicName).getNamespaceObject(),
+                        policies);
 
         // test failure case: unencrypted producer cannot connect
-        ByteBuf clientCommand = 
Commands.newProducer(encryptionRequiredTopicName, 2 /* producer id */, 2 /* 
request id */,
-                "unencrypted-producer", false, Collections.emptyMap(), false);
+        ByteBuf clientCommand =
+                Commands.newProducer(encryptionRequiredTopicName, 2 /* 
producer id */, 2 /* request id */,
+                        "unencrypted-producer", false, Collections.emptyMap(), 
false);
         channel.writeInbound(clientCommand);
 
         Object response = getResponse();
@@ -1636,11 +1614,13 @@ public class ServerCnxTest {
         // add `clusterDispatchRate` otherwise there will be a NPE
         // 
`org.apache.pulsar.broker.service.AbstractTopic.updateNamespaceReplicatorDispatchRate`
         policies.replicatorDispatchRate = new HashMap<>();
-        
doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(namespaceResources)
-                
.getPoliciesAsync(TopicName.get(encryptionRequiredTopicName).getNamespaceObject());
+        testPulsarServiceFactory.getPulsarResources().getNamespaceResources()
+                
.createPolicies(TopicName.get(encryptionRequiredTopicName).getNamespaceObject(),
+                        policies);
 
-        ByteBuf clientCommand = 
Commands.newProducer(encryptionRequiredTopicName, 1 /* producer id */, 1 /* 
request id */,
-                "prod-name", true, Collections.emptyMap(), false);
+        ByteBuf clientCommand =
+                Commands.newProducer(encryptionRequiredTopicName, 1 /* 
producer id */, 1 /* request id */,
+                        "prod-name", true, Collections.emptyMap(), false);
         channel.writeInbound(clientCommand);
         assertTrue(getResponse() instanceof CommandProducerSuccess);
 
@@ -1680,11 +1660,13 @@ public class ServerCnxTest {
         // add `clusterDispatchRate` otherwise there will be a NPE
         // 
`org.apache.pulsar.broker.service.AbstractTopic.updateNamespaceReplicatorDispatchRate`
         policies.replicatorDispatchRate = new HashMap<>();
-        
doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(namespaceResources)
-                
.getPoliciesAsync(TopicName.get(encryptionRequiredTopicName).getNamespaceObject());
+        testPulsarServiceFactory.getPulsarResources().getNamespaceResources()
+                
.createPolicies(TopicName.get(encryptionRequiredTopicName).getNamespaceObject(),
+                        policies);
 
-        ByteBuf clientCommand = 
Commands.newProducer(encryptionRequiredTopicName, 1 /* producer id */, 1 /* 
request id */,
-                "prod-name", true, Collections.emptyMap(), false);
+        ByteBuf clientCommand =
+                Commands.newProducer(encryptionRequiredTopicName, 1 /* 
producer id */, 1 /* request id */,
+                        "prod-name", true, Collections.emptyMap(), false);
         channel.writeInbound(clientCommand);
         assertTrue(getResponse() instanceof CommandProducerSuccess);
 
@@ -1751,18 +1733,16 @@ public class ServerCnxTest {
     private void setupMLAsyncCallbackMocks() {
         ledgerMock = mock(ManagedLedger.class);
         cursorMock = mock(ManagedCursor.class);
-        doReturn(new ArrayList<Object>()).when(ledgerMock).getCursors();
+        doReturn(new ArrayList<>()).when(ledgerMock).getCursors();
 
         // call openLedgerComplete with ledgerMock on ML factory asyncOpen
-        doAnswer(new Answer<Object>() {
-            @Override
-            public Object answer(InvocationOnMock invocationOnMock) throws 
Throwable {
-                Thread.sleep(300);
-                ((OpenLedgerCallback) 
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
-                return null;
-            }
-        }).when(mlFactoryMock).asyncOpen(matches(".*success.*"), 
any(ManagedLedgerConfig.class),
-                any(OpenLedgerCallback.class), any(Supplier.class), any());
+        doAnswer((Answer<Object>) invocationOnMock -> {
+            Thread.sleep(300);
+            ((OpenLedgerCallback) 
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
+            return null;
+        }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+                .asyncOpen(matches(".*success.*"), 
any(ManagedLedgerConfig.class),
+                        any(OpenLedgerCallback.class), any(Supplier.class), 
any());
 
         // call openLedgerFailed on ML factory asyncOpen
         doAnswer((Answer<Object>) invocationOnMock -> {
@@ -1771,19 +1751,17 @@ public class ServerCnxTest {
                     .openLedgerFailed(new ManagedLedgerException("Managed 
ledger failure"), null)).start();
 
             return null;
-        }).when(mlFactoryMock).asyncOpen(matches(".*fail.*"), 
any(ManagedLedgerConfig.class),
-                any(OpenLedgerCallback.class), any(Supplier.class), any());
+        }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+                .asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class),
+                        any(OpenLedgerCallback.class), any(Supplier.class), 
any());
 
         // call addComplete on ledger asyncAddEntry
-        doAnswer(new Answer<Object>() {
-            @Override
-            public Object answer(InvocationOnMock invocationOnMock) throws 
Throwable {
-                ((AddEntryCallback) 
invocationOnMock.getArguments()[1]).addComplete(
-                        new PositionImpl(-1, -1),
-                        null,
-                        invocationOnMock.getArguments()[2]);
-                return null;
-            }
+        doAnswer((Answer<Object>) invocationOnMock -> {
+            ((AddEntryCallback) 
invocationOnMock.getArguments()[1]).addComplete(
+                    new PositionImpl(-1, -1),
+                    null,
+                    invocationOnMock.getArguments()[2]);
+            return null;
         }).when(ledgerMock).asyncAddEntry(any(ByteBuf.class), 
any(AddEntryCallback.class), any());
 
         doAnswer((Answer<Object>) invocationOnMock -> 
true).when(cursorMock).isDurable();
@@ -1792,29 +1770,34 @@ public class ServerCnxTest {
             Thread.sleep(300);
             ((OpenCursorCallback) 
invocationOnMock.getArguments()[3]).openCursorComplete(cursorMock, null);
             return null;
-        }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), 
any(InitialPosition.class), any(OpenCursorCallback.class), any());
+        }).when(ledgerMock)
+                .asyncOpenCursor(matches(".*success.*"), 
any(InitialPosition.class), any(OpenCursorCallback.class),
+                        any());
 
         doAnswer((Answer<Object>) invocationOnMock -> {
             Thread.sleep(300);
             ((OpenCursorCallback) 
invocationOnMock.getArguments()[4]).openCursorComplete(cursorMock, null);
             return null;
-        }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), 
any(InitialPosition.class), any(Map.class), any(Map.class),
-                any(OpenCursorCallback.class), any());
+        }).when(ledgerMock)
+                .asyncOpenCursor(matches(".*success.*"), 
any(InitialPosition.class), any(Map.class), any(Map.class),
+                        any(OpenCursorCallback.class), any());
 
         doAnswer((Answer<Object>) invocationOnMock -> {
             Thread.sleep(300);
             ((OpenCursorCallback) invocationOnMock.getArguments()[3])
                     .openCursorFailed(new ManagedLedgerException("Managed 
ledger failure"), null);
             return null;
-        }).when(ledgerMock).asyncOpenCursor(matches(".*fail.*"), 
any(InitialPosition.class), any(OpenCursorCallback.class), any());
+        }).when(ledgerMock)
+                .asyncOpenCursor(matches(".*fail.*"), 
any(InitialPosition.class), any(OpenCursorCallback.class), any());
 
         doAnswer((Answer<Object>) invocationOnMock -> {
             Thread.sleep(300);
             ((OpenCursorCallback) invocationOnMock.getArguments()[3])
                     .openCursorFailed(new ManagedLedgerException("Managed 
ledger failure"), null);
             return null;
-        }).when(ledgerMock).asyncOpenCursor(matches(".*fail.*"), 
any(InitialPosition.class), any(Map.class), any(Map.class),
-                any(OpenCursorCallback.class), any());
+        }).when(ledgerMock)
+                .asyncOpenCursor(matches(".*fail.*"), 
any(InitialPosition.class), any(Map.class), any(Map.class),
+                        any(OpenCursorCallback.class), any());
 
         doAnswer((Answer<Object>) invocationOnMock -> {
             ((DeleteCursorCallback) 
invocationOnMock.getArguments()[1]).deleteCursorComplete(null);
@@ -2085,7 +2068,7 @@ public class ServerCnxTest {
     }
 
     @Test
-    public void testNeverDelayConsumerFutureWhenNotFail() throws Exception{
+    public void testNeverDelayConsumerFutureWhenNotFail() throws Exception {
         // Mock ServerCnx.field: consumers
         ConcurrentLongHashMap.Builder mapBuilder = 
Mockito.mock(ConcurrentLongHashMap.Builder.class);
         
Mockito.when(mapBuilder.expectedItems(Mockito.anyInt())).thenReturn(mapBuilder);
@@ -2100,12 +2083,12 @@ public class ServerCnxTest {
         // case2: exists existingConsumerFuture, delay complete after execute 
'isDone()' many times
         // Why is the design so complicated, see: 
https://github.com/apache/pulsar/pull/15051
         // Try a delay of 3 stages. The simulation is successful after 
repeated judgments.
-        for(AtomicInteger futureWillDoneAfterDelayTimes = new AtomicInteger(1);
-                                            
futureWillDoneAfterDelayTimes.intValue() <= 3;
-                                            
futureWillDoneAfterDelayTimes.incrementAndGet()){
+        for (AtomicInteger futureWillDoneAfterDelayTimes = new 
AtomicInteger(1);
+             futureWillDoneAfterDelayTimes.intValue() <= 3;
+             futureWillDoneAfterDelayTimes.incrementAndGet()) {
             final AtomicInteger futureCallTimes = new AtomicInteger();
             final Consumer mockConsumer = Mockito.mock(Consumer.class);
-            CompletableFuture existingConsumerFuture = new 
CompletableFuture<Consumer>(){
+            CompletableFuture existingConsumerFuture = new 
CompletableFuture<Consumer>() {
 
                 private boolean complete;
 
@@ -2121,27 +2104,27 @@ public class ServerCnxTest {
 
                 // if trig "getNow()", then complete
                 @Override
-                public Consumer get(){
+                public Consumer get() {
                     complete = true;
                     return mockConsumer;
                 }
 
                 // if trig "get()", then complete
                 @Override
-                public Consumer get(long timeout, TimeUnit unit){
+                public Consumer get(long timeout, TimeUnit unit) {
                     complete = true;
                     return mockConsumer;
                 }
 
                 // if trig "get()", then complete
                 @Override
-                public Consumer getNow(Consumer ifAbsent){
+                public Consumer getNow(Consumer ifAbsent) {
                     complete = true;
                     return mockConsumer;
                 }
 
                 // never fail
-                public boolean isCompletedExceptionally(){
+                public boolean isCompletedExceptionally() {
                     return false;
                 }
             };
@@ -2155,8 +2138,8 @@ public class ServerCnxTest {
                 resetChannel();
                 setChannelConnected();
                 // auth check disable
-                doReturn(false).when(brokerService).isAuthenticationEnabled();
-                doReturn(false).when(brokerService).isAuthorizationEnabled();
+                svcConfig.setAuthenticationEnabled(false);
+                svcConfig.setAuthorizationEnabled(false);
                 // do subscribe
                 ByteBuf clientCommand = 
Commands.newSubscribe(successTopicName, //
                         successSubName, 1 /* consumer id */, 1 /* request id 
*/, SubType.Exclusive, 0,
@@ -2200,8 +2183,8 @@ public class ServerCnxTest {
             resetChannel();
             setChannelConnected();
             // auth check disable
-            doReturn(false).when(brokerService).isAuthenticationEnabled();
-            doReturn(false).when(brokerService).isAuthorizationEnabled();
+            svcConfig.setAuthenticationEnabled(false);
+            svcConfig.setAuthorizationEnabled(false);
             // do subscribe
             ByteBuf clientCommand = Commands.newSubscribe(successTopicName, //
                     successSubName, 1 /* consumer id */, 1 /* request id */, 
SubType.Exclusive, 0,
@@ -2570,7 +2553,8 @@ public class ServerCnxTest {
         setChannelConnected();
         Topic topic = mock(Topic.class);
         
doReturn(CompletableFuture.completedFuture(null)).when(topic).endTxn(any(TxnID.class),
 anyInt(), anyLong());
-        
doReturn(CompletableFuture.completedFuture(Optional.of(topic))).when(brokerService).getTopicIfExists(any(String.class));
+        
doReturn(CompletableFuture.completedFuture(Optional.of(topic))).when(brokerService)
+                .getTopicIfExists(any(String.class));
         ByteBuf clientCommand = Commands.newEndTxnOnPartition(89L, 1L, 12L,
                 successTopicName, TxnAction.COMMIT, 1L);
         channel.writeInbound(clientCommand);
@@ -2596,8 +2580,10 @@ public class ServerCnxTest {
         resetChannel();
         setChannelConnected();
         Topic topic = mock(Topic.class);
-        doReturn(CompletableFuture.failedFuture(new RuntimeException("server 
error"))).when(topic).endTxn(any(TxnID.class), anyInt(), anyLong());
-        
doReturn(CompletableFuture.completedFuture(Optional.of(topic))).when(brokerService).getTopicIfExists(any(String.class));
+        doReturn(CompletableFuture.failedFuture(new RuntimeException("server 
error"))).when(topic)
+                .endTxn(any(TxnID.class), anyInt(), anyLong());
+        
doReturn(CompletableFuture.completedFuture(Optional.of(topic))).when(brokerService)
+                .getTopicIfExists(any(String.class));
         ByteBuf clientCommand = Commands.newEndTxnOnPartition(89L, 1L, 12L,
                 successTopicName, TxnAction.COMMIT, 1L);
         channel.writeInbound(clientCommand);
@@ -2623,11 +2609,13 @@ public class ServerCnxTest {
         resetChannel();
         setChannelConnected();
         Topic topic = mock(Topic.class);
-        final org.apache.pulsar.broker.service.Subscription sub = 
mock(org.apache.pulsar.broker.service.Subscription.class);
+        final org.apache.pulsar.broker.service.Subscription sub =
+                mock(org.apache.pulsar.broker.service.Subscription.class);
         doReturn(sub).when(topic).getSubscription(any());
         doReturn(CompletableFuture.completedFuture(null))
                 .when(sub).endTxn(anyLong(), anyLong(), anyInt(), anyLong());
-        
doReturn(CompletableFuture.completedFuture(Optional.of(topic))).when(brokerService).getTopicIfExists(any(String.class));
+        
doReturn(CompletableFuture.completedFuture(Optional.of(topic))).when(brokerService)
+                .getTopicIfExists(any(String.class));
 
         ByteBuf clientCommand = Commands.newEndTxnOnSubscription(89L, 1L, 12L,
                 successTopicName, successSubName, TxnAction.COMMIT, 1L);
@@ -2656,11 +2644,13 @@ public class ServerCnxTest {
         setChannelConnected();
         Topic topic = mock(Topic.class);
 
-        final org.apache.pulsar.broker.service.Subscription sub = 
mock(org.apache.pulsar.broker.service.Subscription.class);
+        final org.apache.pulsar.broker.service.Subscription sub =
+                mock(org.apache.pulsar.broker.service.Subscription.class);
         doReturn(sub).when(topic).getSubscription(any());
         doReturn(CompletableFuture.failedFuture(new RuntimeException("server 
error")))
                 .when(sub).endTxn(anyLong(), anyLong(), anyInt(), anyLong());
-        
doReturn(CompletableFuture.completedFuture(Optional.of(topic))).when(brokerService).getTopicIfExists(any(String.class));
+        
doReturn(CompletableFuture.completedFuture(Optional.of(topic))).when(brokerService)
+                .getTopicIfExists(any(String.class));
 
         ByteBuf clientCommand = Commands.newEndTxnOnSubscription(89L, 1L, 12L,
                 successTopicName, successSubName, TxnAction.COMMIT, 1L);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
index e869d71e80c..9f913795b2a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
@@ -27,7 +27,6 @@ import static org.mockito.Mockito.doCallRealMethod;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
@@ -113,7 +112,7 @@ public class PersistentSubscriptionTest {
         executor = 
OrderedExecutor.newBuilder().numThreads(1).name("persistent-subscription-test").build();
         eventLoopGroup = new NioEventLoopGroup();
 
-        ServiceConfiguration svcConfig = spy(ServiceConfiguration.class);
+        ServiceConfiguration svcConfig = new ServiceConfiguration();
         svcConfig.setBrokerShutdownTimeoutMs(0L);
         svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
         svcConfig.setTransactionCoordinatorEnabled(true);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/ProcessHandlerFilterTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/ProcessHandlerFilterTest.java
index cc427bcfd6a..8d9c514c4f6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/ProcessHandlerFilterTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/ProcessHandlerFilterTest.java
@@ -40,11 +40,11 @@ public class ProcessHandlerFilterTest {
         BrokerInterceptor spyInterceptor = 
Mockito.spy(BrokerInterceptor.class);
         HttpServletRequest mockHttpServletRequest = 
Mockito.mock(HttpServletRequest.class);
         HttpServletResponse mockHttpServletResponse = 
Mockito.mock(HttpServletResponse.class);
-        ServiceConfiguration mockConfig = 
Mockito.mock(ServiceConfiguration.class);
+        ServiceConfiguration config = new ServiceConfiguration();
         FilterChain mockFilterChain = Mockito.mock(FilterChain.class);
         
Mockito.doReturn(spyInterceptor).when(mockPulsarService).getBrokerInterceptor();
-        Mockito.doReturn(mockConfig).when(mockPulsarService).getConfig();
-        Mockito.doReturn(Sets.newHashSet("Interceptor1", 
"Interceptor2")).when(mockConfig).getBrokerInterceptors();
+        Mockito.doReturn(config).when(mockPulsarService).getConfig();
+        config.setBrokerInterceptors(Sets.newHashSet("Interceptor1", 
"Interceptor2"));
         ProcessHandlerFilter processHandlerFilter = new 
ProcessHandlerFilter(mockPulsarService);
         processHandlerFilter.doFilter(mockHttpServletRequest, 
mockHttpServletResponse, mockFilterChain);
         Mockito.verify(spyInterceptor).onFilter(mockHttpServletRequest, 
mockHttpServletResponse, mockFilterChain);
@@ -55,11 +55,11 @@ public class ProcessHandlerFilterTest {
         PulsarService mockPulsarService = Mockito.mock(PulsarService.class);
         BrokerInterceptor spyInterceptor = 
Mockito.mock(BrokerInterceptor.class);
         HttpServletResponse mockHttpServletResponse = 
Mockito.mock(HttpServletResponse.class);
-        ServiceConfiguration mockConfig = 
Mockito.mock(ServiceConfiguration.class);
+        ServiceConfiguration config = new ServiceConfiguration();
         FilterChain spyFilterChain = Mockito.spy(FilterChain.class);
         
Mockito.doReturn(spyInterceptor).when(mockPulsarService).getBrokerInterceptor();
-        Mockito.doReturn(mockConfig).when(mockPulsarService).getConfig();
-        Mockito.doReturn(new 
HashSet<>()).when(mockConfig).getBrokerInterceptors();
+        Mockito.doReturn(config).when(mockPulsarService).getConfig();
+        config.setBrokerInterceptors(new HashSet<>());
         // empty interceptor list
         HttpServletRequest mockHttpServletRequest = 
Mockito.mock(HttpServletRequest.class);
         ProcessHandlerFilter processHandlerFilter = new 
ProcessHandlerFilter(mockPulsarService);
@@ -67,7 +67,7 @@ public class ProcessHandlerFilterTest {
         Mockito.verify(spyFilterChain).doFilter(mockHttpServletRequest, 
mockHttpServletResponse);
         Mockito.clearInvocations(spyFilterChain);
         // request has MULTIPART_FORM_DATA content-type
-        
Mockito.doReturn(Sets.newHashSet("Interceptor1","Interceptor2")).when(mockConfig).getBrokerInterceptors();
+        
config.setBrokerInterceptors(Sets.newHashSet("Interceptor1","Interceptor2"));
         HttpServletRequest mockHttpServletRequest2 = 
Mockito.mock(HttpServletRequest.class);
         
Mockito.doReturn(MediaType.MULTIPART_FORM_DATA).when(mockHttpServletRequest2).getContentType();
         ProcessHandlerFilter processHandlerFilter2 = new 
ProcessHandlerFilter(mockPulsarService);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java
index f5c3c1de1c2..6b3b05405ba 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java
@@ -19,12 +19,10 @@
 package org.apache.pulsar.client.api;
 
 import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
-import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
-
 import com.google.common.base.Function;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
@@ -81,7 +79,7 @@ public class ClientDeduplicationFailureTest {
         bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
         bkEnsemble.start();
 
-        config = spy(ServiceConfiguration.class);
+        config = new ServiceConfiguration();
         config.setClusterName("use");
         config.setWebServicePort(Optional.of(0));
         config.setMetadataStoreUrl("zk:127.0.0.1:" + 
bkEnsemble.getZookeeperPort());

Reply via email to