This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6112b6501e3 [improve][test] Reduce the use of Mockito spies/mocks in
tests (#19326)
6112b6501e3 is described below
commit 6112b6501e32b9d46612ebbc7a18482661894053
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Jan 25 22:38:30 2023 +0200
[improve][test] Reduce the use of Mockito spies/mocks in tests (#19326)
---
.../apache/pulsar/broker/TestPulsarService.java | 39 +-
.../broker/loadbalance/SimpleBrokerStartTest.java | 5 +-
.../loadbalance/SimpleLoadManagerImplTest.java | 3 +-
.../broker/lookup/http/HttpTopicLookupv2Test.java | 12 +-
.../broker/namespace/OwnershipCacheTest.java | 6 +-
.../PersistentDispatcherFailoverConsumerTest.java | 2 +-
.../service/PersistentTopicConcurrentTest.java | 3 +-
.../pulsar/broker/service/ServerCnxTest.java | 462 ++++++++++-----------
.../persistent/PersistentSubscriptionTest.java | 3 +-
.../broker/web/ProcessHandlerFilterTest.java | 14 +-
.../client/api/ClientDeduplicationFailureTest.java | 4 +-
11 files changed, 284 insertions(+), 269 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/TestPulsarService.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/TestPulsarService.java
index 873be15b8ff..3c998515cb1 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/TestPulsarService.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/TestPulsarService.java
@@ -22,13 +22,16 @@ package org.apache.pulsar.broker;
import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgsRecordingInvocations;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import java.io.IOException;
import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
+import java.util.function.Supplier;
import lombok.Builder;
import lombok.Getter;
import lombok.ToString;
@@ -37,6 +40,7 @@ import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.stats.NullStatsProvider;
import org.apache.bookkeeper.stats.StatsProvider;
+import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.resources.TopicResources;
@@ -49,6 +53,7 @@ import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.GracefulExecutorServicesShutdown;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.metadata.api.MetadataStore;
@@ -168,7 +173,7 @@ public class TestPulsarService extends PulsarService {
throw new RuntimeException(e);
}
if (super.config == null) {
- ServiceConfiguration svcConfig =
spy(ServiceConfiguration.class);
+ ServiceConfiguration svcConfig = new
ServiceConfiguration();
svcConfig.setBrokerShutdownTimeoutMs(0L);
svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
svcConfig.setClusterName("pulsar-cluster");
@@ -225,9 +230,9 @@ public class TestPulsarService extends PulsarService {
brokerServiceFunction(pulsarService -> {
try {
if (!super.useSpies) {
- return new BrokerService(pulsarService,
super.eventLoopGroup);
+ return new
TestBrokerService(pulsarService, super.eventLoopGroup);
} else {
- return
spyWithClassAndConstructorArgs(BrokerService.class, pulsarService,
+ return
spyWithClassAndConstructorArgs(TestBrokerService.class, pulsarService,
super.eventLoopGroup);
}
} catch (Exception e) {
@@ -312,6 +317,18 @@ public class TestPulsarService extends PulsarService {
}
}
+ private static class TestBrokerService extends BrokerService {
+
+ TestBrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup)
throws Exception {
+ super(pulsar, eventLoopGroup);
+ }
+
+ @Override
+ protected CompletableFuture<Map<String, String>>
fetchTopicPropertiesAsync(TopicName topicName) {
+ return CompletableFuture.completedFuture(Collections.emptyMap());
+ }
+ }
+
private final MetadataStoreExtended localMetadataStore;
private final MetadataStoreExtended configurationMetadataStore;
private final PulsarResources pulsarResources;
@@ -326,6 +343,8 @@ public class TestPulsarService extends PulsarService {
private final PulsarClient pulsarClient;
+ private final NamespaceService namespaceService;
+
protected TestPulsarService(ServiceConfiguration config,
MetadataStoreExtended localMetadataStore,
MetadataStoreExtended
configurationMetadataStore, PulsarResources pulsarResources,
ManagedLedgerStorage
managedLedgerClientFactory,
@@ -341,6 +360,18 @@ public class TestPulsarService extends PulsarService {
this.compactor = compactor;
this.schemaRegistryService =
spyWithClassAndConstructorArgs(DefaultSchemaRegistryService.class);
this.pulsarClient = mock(PulsarClientImpl.class);
+ this.namespaceService = mock(NamespaceService.class);
+ try {
+ startNamespaceService();
+ } catch (PulsarServerException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+
+ @Override
+ public Supplier<NamespaceService> getNamespaceServiceProvider() throws
PulsarServerException {
+ return () -> namespaceService;
}
@Override
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleBrokerStartTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleBrokerStartTest.java
index 7bc0d564f56..6de2eb90f12 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleBrokerStartTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleBrokerStartTest.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.broker.loadbalance;
-import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.util.Optional;
@@ -44,7 +43,7 @@ public class SimpleBrokerStartTest {
LocalBookkeeperEnsemble bkEnsemble = new LocalBookkeeperEnsemble(3, 0,
() -> 0);
bkEnsemble.start();
// Start broker
- ServiceConfiguration config = spy(ServiceConfiguration.class);
+ ServiceConfiguration config = new ServiceConfiguration();
config.setClusterName("use");
config.setWebServicePort(Optional.of(0));
config.setMetadataStoreUrl("zk:127.0.0.1:" +
bkEnsemble.getZookeeperPort());
@@ -72,7 +71,7 @@ public class SimpleBrokerStartTest {
LocalBookkeeperEnsemble bkEnsemble = new LocalBookkeeperEnsemble(3, 0,
() -> 0);
bkEnsemble.start();
// Start broker
- ServiceConfiguration config = spy(ServiceConfiguration.class);
+ ServiceConfiguration config = new ServiceConfiguration();
config.setClusterName("use");
config.setWebServicePort(Optional.of(0));
config.setMetadataStoreUrl("zk:127.0.0.1:" +
bkEnsemble.getZookeeperPort());
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
index b267e5e1856..c4898786e3e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.broker.loadbalance;
import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgsRecordingInvocations;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
@@ -105,7 +104,7 @@ public class SimpleLoadManagerImplTest {
bkEnsemble.start();
// Start broker 1
- ServiceConfiguration config1 = spy(ServiceConfiguration.class);
+ ServiceConfiguration config1 = new ServiceConfiguration();
config1.setClusterName("use");
config1.setWebServicePort(Optional.of(0));
config1.setMetadataStoreUrl("zk:127.0.0.1:" +
bkEnsemble.getZookeeperPort());
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java
index 77541fb5cb4..6a6065bc289 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java
@@ -78,7 +78,7 @@ public class HttpTopicLookupv2Test {
pulsar = mock(PulsarService.class);
ns = mock(NamespaceService.class);
auth = mock(AuthorizationService.class);
- config = spy(ServiceConfiguration.class);
+ config = new ServiceConfiguration();
config.setClusterName("use");
clusters = new TreeSet<>();
clusters.add("use");
@@ -121,7 +121,7 @@ public class HttpTopicLookupv2Test {
uriField.set(destLookup, uriInfo);
URI uri =
URI.create("http://localhost:8080/lookup/v2/destination/topic/myprop/usc/ns2/topic1");
doReturn(uri).when(uriInfo).getRequestUri();
- doReturn(true).when(config).isAuthorizationEnabled();
+ config.setAuthorizationEnabled(true);
AsyncResponse asyncResponse = mock(AsyncResponse.class);
destLookup.lookupTopicAsync(asyncResponse,
TopicDomain.persistent.value(), "myprop", "usc", "ns2", "topic1", false, null,
null);
@@ -146,7 +146,7 @@ public class HttpTopicLookupv2Test {
uriField.set(destLookup, uriInfo);
URI uri =
URI.create("http://localhost:8080/lookup/v2/destination/topic/myprop/usc/ns2/topic1");
doReturn(uri).when(uriInfo).getRequestUri();
- doReturn(true).when(config).isAuthorizationEnabled();
+ config.setAuthorizationEnabled(true);
NamespaceService namespaceService = pulsar.getNamespaceService();
CompletableFuture<Boolean> future = new CompletableFuture<>();
@@ -173,7 +173,7 @@ public class HttpTopicLookupv2Test {
return CompletableFuture.completedFuture(null);
}
}
-
+
@Test
public void testNotEnoughLookupPermits() throws Exception {
@@ -190,7 +190,7 @@ public class HttpTopicLookupv2Test {
uriField.set(destLookup, uriInfo);
URI uri =
URI.create("http://localhost:8080/lookup/v2/destination/topic/myprop/usc/ns2/topic1");
doReturn(uri).when(uriInfo).getRequestUri();
- doReturn(true).when(config).isAuthorizationEnabled();
+ config.setAuthorizationEnabled(true);
AsyncResponse asyncResponse1 = mock(AsyncResponse.class);
destLookup.lookupTopicAsync(asyncResponse1,
TopicDomain.persistent.value(), "myprop", "usc", "ns2", "topic1", false,null,
null);
@@ -220,7 +220,7 @@ public class HttpTopicLookupv2Test {
uriField.setAccessible(true);
UriInfo uriInfo = mock(UriInfo.class);
uriField.set(destLookup, uriInfo);
- doReturn(false).when(config).isAuthorizationEnabled();
+ config.setAuthorizationEnabled(false);
AsyncResponse asyncResponse = mock(AsyncResponse.class);
ArgumentCaptor<RestException> arg =
ArgumentCaptor.forClass(RestException.class);
// // Test policy not found
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
index 38a1252e2ee..9e3d9e3a413 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
@@ -82,7 +82,7 @@ public class OwnershipCacheTest {
final int port = 8080;
selfBrokerUrl = "tcp://localhost:" + port;
pulsar = mock(PulsarService.class);
- config = mock(ServiceConfiguration.class);
+ config = new ServiceConfiguration();
executor =
OrderedScheduler.newSchedulerBuilder().numThreads(1).name("test").build();
zookeeperServer = new ZookeeperServerTest(0);
zookeeperServer.start();
@@ -106,8 +106,8 @@ public class OwnershipCacheTest {
doReturn(config).when(pulsar).getConfiguration();
doReturn(nsService).when(pulsar).getNamespaceService();
- doReturn(Optional.of(port)).when(config).getBrokerServicePort();
- doReturn(Optional.empty()).when(config).getWebServicePort();
+ config.setBrokerServicePort(Optional.of(port));
+ config.setWebServicePort(Optional.empty());
doReturn(brokerService).when(pulsar).getBrokerService();
doReturn(selfBrokerUrl).when(pulsar).getBrokerServiceUrl();
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index 2bebac14023..4dfaade33ca 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -98,7 +98,7 @@ public class PersistentDispatcherFailoverConsumerTest {
@BeforeMethod
public void setup() throws Exception {
- ServiceConfiguration svcConfig = spy(ServiceConfiguration.class);
+ ServiceConfiguration svcConfig = new ServiceConfiguration();
svcConfig.setBrokerShutdownTimeoutMs(0L);
svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
svcConfig.setClusterName("pulsar-cluster");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
index 4bf0481e0d0..6ce6de1916f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
@@ -22,7 +22,6 @@ import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructor
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertFalse;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
@@ -78,7 +77,7 @@ public class PersistentTopicConcurrentTest extends
MockedBookKeeperTestCase {
@BeforeMethod
public void setup(Method m) throws Exception {
super.setUp(m);
- ServiceConfiguration svcConfig = spy(ServiceConfiguration.class);
+ ServiceConfiguration svcConfig = new ServiceConfiguration();
svcConfig.setBrokerShutdownTimeoutMs(0L);
svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
@Cleanup
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 665ac51446b..c4088d2e124 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -16,23 +16,20 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.pulsar.broker.service;
import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
-import static
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper;
-import static
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyObject;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.CALLS_REAL_METHODS;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.matches;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -44,14 +41,11 @@ import static org.testng.Assert.assertTrue;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
-import io.netty.channel.EventLoopGroup;
import io.netty.channel.embedded.EmbeddedChannel;
-import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
-import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -65,7 +59,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.function.Supplier;
-import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
@@ -75,27 +68,22 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
-import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.TestPulsarService;
import org.apache.pulsar.broker.TransactionMetadataStoreService;
import org.apache.pulsar.broker.auth.MockAuthenticationProvider;
import org.apache.pulsar.broker.auth.MockMultiStageAuthenticationProvider;
-import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authentication.AuthenticationState;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
-import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.namespace.NamespaceService;
-import org.apache.pulsar.broker.resources.NamespaceResources;
-import org.apache.pulsar.broker.resources.PulsarResources;
import
org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
import org.apache.pulsar.broker.service.ServerCnx.State;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
-import org.apache.pulsar.broker.service.schema.DefaultSchemaRegistryService;
import org.apache.pulsar.broker.service.utils.ClientChannelHelper;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.AuthData;
@@ -138,17 +126,11 @@ import
org.apache.pulsar.common.protocol.Commands.ChecksumType;
import org.apache.pulsar.common.protocol.PulsarHandler;
import org.apache.pulsar.common.topics.TopicList;
import org.apache.pulsar.common.util.FutureUtil;
-import org.apache.pulsar.common.util.GracefulExecutorServicesShutdown;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
-import org.apache.pulsar.common.util.netty.EventLoopUtil;
-import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
-import org.apache.pulsar.metadata.impl.ZKMetadataStore;
-import org.apache.zookeeper.ZooKeeper;
import org.awaitility.Awaitility;
import org.mockito.ArgumentCaptor;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -160,12 +142,12 @@ public class ServerCnxTest {
protected EmbeddedChannel channel;
private ServiceConfiguration svcConfig;
private ServerCnx serverCnx;
+
+ protected TestPulsarService.Factory testPulsarServiceFactory;
+
+ protected PulsarService pulsar;
protected BrokerService brokerService;
- private ManagedLedgerFactory mlFactoryMock;
private ClientChannelHelper clientChannelHelper;
- private PulsarService pulsar;
- private MetadataStoreExtended store;
- private NamespaceResources namespaceResources;
protected NamespaceService namespaceService;
private final int currentProtocolVersion =
ProtocolVersion.values()[ProtocolVersion.values().length - 1]
.getValue();
@@ -175,7 +157,8 @@ public class ServerCnxTest {
private final String nonOwnedTopicName =
"persistent://prop/use/ns-abc/success-not-owned-topic";
private final String encryptionRequiredTopicName =
"persistent://prop/use/ns-abc/successEncryptionRequiredTopic";
private final String successSubName = "successSub";
- private final String nonExistentTopicName =
"persistent://nonexistent-prop/nonexistent-cluster/nonexistent-namespace/successNonExistentTopic";
+ private final String nonExistentTopicName =
+
"persistent://nonexistent-prop/nonexistent-cluster/nonexistent-namespace/successNonExistentTopic";
private final String topicWithNonLocalCluster =
"persistent://prop/usw/ns-abc/successTopic";
private final List<String> matchingTopics = Arrays.asList(
"persistent://use/ns-abc/topic-1",
@@ -188,55 +171,26 @@ public class ServerCnxTest {
private ManagedLedger ledgerMock;
private ManagedCursor cursorMock;
- private OrderedExecutor executor;
- private EventLoopGroup eventLoopGroup;
+
@BeforeMethod(alwaysRun = true)
public void setup() throws Exception {
- eventLoopGroup = new NioEventLoopGroup();
- executor = OrderedExecutor.newBuilder().numThreads(1).build();
- svcConfig = spy(ServiceConfiguration.class);
+ svcConfig = new ServiceConfiguration();
svcConfig.setBrokerShutdownTimeoutMs(0L);
svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
- svcConfig.setClusterName("pulsar-cluster");
- pulsar = spyWithClassAndConstructorArgs(PulsarService.class,
svcConfig);
- doReturn(new
DefaultSchemaRegistryService()).when(pulsar).getSchemaRegistryService();
-
svcConfig.setKeepAliveIntervalSeconds(inSec(1, TimeUnit.SECONDS));
svcConfig.setBacklogQuotaCheckEnabled(false);
- doReturn(svcConfig).when(pulsar).getConfiguration();
-
doReturn(mock(PulsarResources.class)).when(pulsar).getPulsarResources();
-
- doReturn("use").when(svcConfig).getClusterName();
-
- mlFactoryMock = mock(ManagedLedgerFactory.class);
- doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory();
-
- ZooKeeper mockZk = createMockZooKeeper();
- doReturn(createMockBookKeeper(executor))
- .when(pulsar).getBookKeeperClient();
-
- store = new ZKMetadataStore(mockZk);
-
- doReturn(store).when(pulsar).getLocalMetadataStore();
- doReturn(store).when(pulsar).getConfigurationMetadataStore();
+ svcConfig.setClusterName("use");
+ testPulsarServiceFactory = TestPulsarService.Factory.builder()
+ .config(svcConfig)
+ .useSpies(true)
+ .build();
+ pulsar = testPulsarServiceFactory.getPulsarService();
- brokerService = spyWithClassAndConstructorArgs(BrokerService.class,
pulsar, eventLoopGroup);
- BrokerInterceptor interceptor = mock(BrokerInterceptor.class);
- doReturn(interceptor).when(brokerService).getInterceptor();
- doReturn(brokerService).when(pulsar).getBrokerService();
-
doReturn(CompletableFuture.completedFuture(Collections.emptyMap())).when(brokerService).fetchTopicPropertiesAsync(anyObject());
+ brokerService = testPulsarServiceFactory.getBrokerService();
- doReturn(executor).when(pulsar).getOrderedExecutor();
-
- PulsarResources pulsarResources =
spyWithClassAndConstructorArgs(PulsarResources.class, store, store);
- namespaceResources =
spyWithClassAndConstructorArgs(NamespaceResources.class, store, 30);
-
doReturn(namespaceResources).when(pulsarResources).getNamespaceResources();
- doReturn(pulsarResources).when(pulsar).getPulsarResources();
-
- namespaceService = mock(NamespaceService.class);
+ namespaceService = pulsar.getNamespaceService();
doReturn(CompletableFuture.completedFuture(null)).when(namespaceService).getBundleAsync(any());
- doReturn(namespaceService).when(pulsar).getNamespaceService();
doReturn(true).when(namespaceService).isServiceUnitOwned(any());
doReturn(true).when(namespaceService).isServiceUnitActive(any());
doReturn(CompletableFuture.completedFuture(true)).when(namespaceService).isServiceUnitActiveAsync(any());
@@ -263,14 +217,10 @@ public class ServerCnxTest {
if (channel != null) {
channel.close();
}
- brokerService.close();
- pulsar.close();
- GracefulExecutorServicesShutdown.initiate()
- .timeout(Duration.ZERO)
- .shutdown(executor)
- .handle().get();
- EventLoopUtil.shutdownGracefully(eventLoopGroup).get();
- store.close();
+ if (testPulsarServiceFactory != null) {
+ testPulsarServiceFactory.close();
+ testPulsarServiceFactory = null;
+ }
}
@Test(timeOut = 30000)
@@ -366,7 +316,8 @@ public class ServerCnxTest {
assertEquals(serverCnx.getState(), State.Start);
// test server response to CONNECT
- ByteBuf clientCommand = Commands.newConnect("none", "",
ProtocolVersion.v0.getValue(), null, null, null, null, null);
+ ByteBuf clientCommand =
+ Commands.newConnect("none", "", ProtocolVersion.v0.getValue(),
null, null, null, null, null);
channel.writeInbound(clientCommand);
assertEquals(serverCnx.getState(), State.Connected);
@@ -408,22 +359,21 @@ public class ServerCnxTest {
AuthenticationService authenticationService =
mock(AuthenticationService.class);
AuthenticationProvider authenticationProvider =
mock(AuthenticationProvider.class);
AuthenticationState authenticationState =
mock(AuthenticationState.class);
- AuthenticationDataSource authenticationDataSource =
mock(AuthenticationDataSource.class);
AuthData authData = AuthData.of(null);
doReturn(authenticationService).when(brokerService).getAuthenticationService();
doReturn(authenticationProvider).when(authenticationService).getAuthenticationProvider(Mockito.anyString());
doReturn(authenticationState).when(authenticationProvider)
- .newAuthState(Mockito.any(), Mockito.any(), Mockito.any());
+ .newAuthState(Mockito.any(), Mockito.any(), Mockito.any());
doReturn(authData).when(authenticationState)
- .authenticate(authData);
+ .authenticate(authData);
doReturn(true).when(authenticationState)
- .isComplete();
+ .isComplete();
doReturn("appid1").when(authenticationState)
- .getAuthRole();
+ .getAuthRole();
- doReturn(true).when(brokerService).isAuthenticationEnabled();
+ svcConfig.setAuthenticationEnabled(true);
resetChannel();
assertTrue(channel.isActive());
@@ -443,7 +393,7 @@ public class ServerCnxTest {
AuthenticationService authenticationService =
mock(AuthenticationService.class);
doReturn(authenticationService).when(brokerService).getAuthenticationService();
doReturn(Optional.empty()).when(authenticationService).getAnonymousUserRole();
- doReturn(true).when(brokerService).isAuthenticationEnabled();
+ svcConfig.setAuthenticationEnabled(true);
resetChannel();
assertTrue(channel.isActive());
@@ -474,7 +424,7 @@ public class ServerCnxTest {
assertTrue(channel.isActive());
assertEquals(serverCnx.getState(), State.Start);
- ByteBuf clientCommand = Commands.newConnect(authMethodName,
"pass.proxy", 1,null,
+ ByteBuf clientCommand = Commands.newConnect(authMethodName,
"pass.proxy", 1, null,
null, "client", "fail", authMethodName);
channel.writeInbound(clientCommand);
@@ -513,7 +463,8 @@ public class ServerCnxTest {
assertTrue(challenge1 instanceof CommandAuthChallenge);
// Trigger another AuthChallenge to verify that code path continues to
challenge
- ByteBuf authResponse1 = Commands.newAuthResponse(authMethodName,
AuthData.of("challenge.client".getBytes()), 1, "1");
+ ByteBuf authResponse1 =
+ Commands.newAuthResponse(authMethodName,
AuthData.of("challenge.client".getBytes()), 1, "1");
channel.writeInbound(authResponse1);
Object challenge2 = getResponse();
@@ -611,10 +562,11 @@ public class ServerCnxTest {
@Test(timeOut = 30000)
public void testProducerCommandWithAuthorizationPositive() throws
Exception {
AuthorizationService authorizationService =
mock(AuthorizationService.class);
-
doReturn(CompletableFuture.completedFuture(true)).when(authorizationService).allowTopicOperationAsync(Mockito.any(),
- Mockito.any(), Mockito.any(), Mockito.any());
+
doReturn(CompletableFuture.completedFuture(true)).when(authorizationService)
+ .allowTopicOperationAsync(Mockito.any(),
+ Mockito.any(), Mockito.any(), Mockito.any());
doReturn(authorizationService).when(brokerService).getAuthorizationService();
- doReturn(true).when(brokerService).isAuthenticationEnabled();
+ svcConfig.setAuthenticationEnabled(true);
resetChannel();
setChannelConnected();
@@ -635,16 +587,19 @@ public class ServerCnxTest {
@Test(timeOut = 30000)
public void testNonExistentTopic() throws Exception {
- AuthorizationService authorizationService =
spyWithClassAndConstructorArgs(AuthorizationService.class, svcConfig,
pulsar.getPulsarResources());
+ AuthorizationService authorizationService =
+ spyWithClassAndConstructorArgs(AuthorizationService.class,
svcConfig, pulsar.getPulsarResources());
doReturn(authorizationService).when(brokerService).getAuthorizationService();
- doReturn(true).when(brokerService).isAuthorizationEnabled();
+ svcConfig.setAuthorizationEnabled(true);
svcConfig.setAuthorizationEnabled(true);
Field providerField =
AuthorizationService.class.getDeclaredField("provider");
providerField.setAccessible(true);
- PulsarAuthorizationProvider authorizationProvider =
spyWithClassAndConstructorArgs(PulsarAuthorizationProvider.class, svcConfig,
- pulsar.getPulsarResources());
+ PulsarAuthorizationProvider authorizationProvider =
+
spyWithClassAndConstructorArgs(PulsarAuthorizationProvider.class, svcConfig,
+ pulsar.getPulsarResources());
providerField.set(authorizationService, authorizationProvider);
-
doReturn(CompletableFuture.completedFuture(false)).when(authorizationProvider).isSuperUser(Mockito.anyString(),
Mockito.any(), Mockito.any());
+
doReturn(CompletableFuture.completedFuture(false)).when(authorizationProvider)
+ .isSuperUser(Mockito.anyString(), Mockito.any(),
Mockito.any());
// Test producer creation
resetChannel();
@@ -659,7 +614,8 @@ public class ServerCnxTest {
resetChannel();
setChannelConnected();
ByteBuf newSubscribeCmd = Commands.newSubscribe(nonExistentTopicName,
//
- successSubName, 1 /* consumer id */, 1 /* request id */,
SubType.Exclusive, 0, "test" /* consumer name */, 0);
+ successSubName, 1 /* consumer id */, 1 /* request id */,
SubType.Exclusive, 0,
+ "test" /* consumer name */, 0);
channel.writeInbound(newSubscribeCmd);
assertTrue(getResponse() instanceof CommandError);
channel.finish();
@@ -668,18 +624,23 @@ public class ServerCnxTest {
@Test(timeOut = 30000)
public void testClusterAccess() throws Exception {
svcConfig.setAuthorizationEnabled(true);
- AuthorizationService authorizationService =
spyWithClassAndConstructorArgs(AuthorizationService.class, svcConfig,
pulsar.getPulsarResources());
+ AuthorizationService authorizationService =
+ spyWithClassAndConstructorArgs(AuthorizationService.class,
svcConfig, pulsar.getPulsarResources());
Field providerField =
AuthorizationService.class.getDeclaredField("provider");
providerField.setAccessible(true);
- PulsarAuthorizationProvider authorizationProvider =
spyWithClassAndConstructorArgs(PulsarAuthorizationProvider.class, svcConfig,
- pulsar.getPulsarResources());
+ PulsarAuthorizationProvider authorizationProvider =
+
spyWithClassAndConstructorArgs(PulsarAuthorizationProvider.class, svcConfig,
+ pulsar.getPulsarResources());
providerField.set(authorizationService, authorizationProvider);
doReturn(authorizationService).when(brokerService).getAuthorizationService();
- doReturn(true).when(brokerService).isAuthorizationEnabled();
-
doReturn(CompletableFuture.completedFuture(false)).when(authorizationProvider).isSuperUser(Mockito.anyString(),
Mockito.any(), Mockito.any());
-
doReturn(CompletableFuture.completedFuture(false)).when(authorizationProvider).validateTenantAdminAccess(Mockito.anyString(),
Mockito.any(), Mockito.any());
-
doReturn(CompletableFuture.completedFuture(true)).when(authorizationProvider).checkPermission(any(TopicName.class),
Mockito.anyString(),
- any(AuthAction.class));
+ svcConfig.setAuthorizationEnabled(true);
+
doReturn(CompletableFuture.completedFuture(false)).when(authorizationProvider)
+ .isSuperUser(Mockito.anyString(), Mockito.any(),
Mockito.any());
+
doReturn(CompletableFuture.completedFuture(false)).when(authorizationProvider)
+ .validateTenantAdminAccess(Mockito.anyString(), Mockito.any(),
Mockito.any());
+
doReturn(CompletableFuture.completedFuture(true)).when(authorizationProvider)
+ .checkPermission(any(TopicName.class), Mockito.anyString(),
+ any(AuthAction.class));
resetChannel();
setChannelConnected();
@@ -699,14 +660,18 @@ public class ServerCnxTest {
@Test(timeOut = 30000)
public void testNonExistentTopicSuperUserAccess() throws Exception {
- AuthorizationService authorizationService =
spyWithClassAndConstructorArgs(AuthorizationService.class, svcConfig,
pulsar.getPulsarResources());
+ AuthorizationService authorizationService =
+ spyWithClassAndConstructorArgs(AuthorizationService.class,
svcConfig, pulsar.getPulsarResources());
doReturn(authorizationService).when(brokerService).getAuthorizationService();
- doReturn(true).when(brokerService).isAuthorizationEnabled();
+ svcConfig.setAuthorizationEnabled(true);
Field providerField =
AuthorizationService.class.getDeclaredField("provider");
providerField.setAccessible(true);
- PulsarAuthorizationProvider authorizationProvider =
spyWithClassAndConstructorArgs(PulsarAuthorizationProvider.class, svcConfig,
pulsar.getPulsarResources());
+ PulsarAuthorizationProvider authorizationProvider =
+
spyWithClassAndConstructorArgs(PulsarAuthorizationProvider.class, svcConfig,
+ pulsar.getPulsarResources());
providerField.set(authorizationService, authorizationProvider);
-
doReturn(CompletableFuture.completedFuture(true)).when(authorizationProvider).isSuperUser(Mockito.anyString(),
Mockito.any(), Mockito.any());
+
doReturn(CompletableFuture.completedFuture(true)).when(authorizationProvider)
+ .isSuperUser(Mockito.anyString(), Mockito.any(),
Mockito.any());
// Test producer creation
resetChannel();
@@ -739,11 +704,12 @@ public class ServerCnxTest {
@Test(timeOut = 30000)
public void testProducerCommandWithAuthorizationNegative() throws
Exception {
AuthorizationService authorizationService =
mock(AuthorizationService.class);
-
doReturn(CompletableFuture.completedFuture(false)).when(authorizationService).allowTopicOperationAsync(Mockito.any(),
- Mockito.any(), Mockito.any(), Mockito.any());
+
doReturn(CompletableFuture.completedFuture(false)).when(authorizationService)
+ .allowTopicOperationAsync(Mockito.any(),
+ Mockito.any(), Mockito.any(), Mockito.any());
doReturn(authorizationService).when(brokerService).getAuthorizationService();
- doReturn(true).when(brokerService).isAuthenticationEnabled();
- doReturn(true).when(brokerService).isAuthorizationEnabled();
+ svcConfig.setAuthenticationEnabled(true);
+ svcConfig.setAuthorizationEnabled(true);
doReturn("prod1").when(brokerService).generateUniqueProducerName();
resetChannel();
setChannelConnected();
@@ -925,12 +891,13 @@ public class ServerCnxTest {
// Delay the topic creation in a deterministic way
CompletableFuture<Runnable> openTopicFuture = new
CompletableFuture<>();
doAnswer(invocationOnMock -> {
- openTopicFuture.complete(() -> {
- ((OpenLedgerCallback)
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
- });
+ openTopicFuture.complete(
+ () -> ((OpenLedgerCallback)
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock,
+ null));
return null;
- }).when(mlFactoryMock).asyncOpen(matches(".*success.*"),
any(ManagedLedgerConfig.class),
- any(OpenLedgerCallback.class), any(Supplier.class), any());
+ }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+ .asyncOpen(matches(".*success.*"),
any(ManagedLedgerConfig.class),
+ any(OpenLedgerCallback.class), any(Supplier.class),
any());
// In a create producer timeout from client side we expect to see this
sequence of commands :
// 1. create producer
@@ -946,7 +913,7 @@ public class ServerCnxTest {
producerName, Collections.emptyMap(), false);
channel.writeInbound(createProducer1);
- ByteBuf closeProducer = Commands.newCloseProducer(1 /* producer id */,
2 /* request id */ );
+ ByteBuf closeProducer = Commands.newCloseProducer(1 /* producer id */,
2 /* request id */);
channel.writeInbound(closeProducer);
ByteBuf createProducer2 = Commands.newProducer(successTopicName, 1 /*
producer id */, 3 /* request id */,
@@ -979,12 +946,13 @@ public class ServerCnxTest {
// Delay the topic creation in a deterministic way
CompletableFuture<Runnable> openTopicFuture = new
CompletableFuture<>();
doAnswer(invocationOnMock -> {
- openTopicFuture.complete(() -> {
- ((OpenLedgerCallback)
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
- });
+ openTopicFuture.complete(
+ () -> ((OpenLedgerCallback)
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock,
+ null));
return null;
- }).when(mlFactoryMock).asyncOpen(matches(".*success.*"),
any(ManagedLedgerConfig.class),
- any(OpenLedgerCallback.class), any(Supplier.class), any());
+ }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+ .asyncOpen(matches(".*success.*"),
any(ManagedLedgerConfig.class),
+ any(OpenLedgerCallback.class), any(Supplier.class),
any());
// In a create producer timeout from client side we expect to see this
sequence of commands :
// 1. create producer
@@ -1002,7 +970,7 @@ public class ServerCnxTest {
producerName, Collections.emptyMap(), false);
channel.writeInbound(createProducer1);
- ByteBuf closeProducer = Commands.newCloseProducer(1 /* producer id */,
2 /* request id */ );
+ ByteBuf closeProducer = Commands.newCloseProducer(1 /* producer id */,
2 /* request id */);
channel.writeInbound(closeProducer);
ByteBuf createProducer2 = Commands.newProducer(successTopicName, 1 /*
producer id */, 3 /* request id */,
@@ -1044,16 +1012,14 @@ public class ServerCnxTest {
// Delay the topic creation in a deterministic way
CountDownLatch topicCreationDelayLatch = new CountDownLatch(1);
- doAnswer(new Answer<Object>() {
- @Override
- public Object answer(InvocationOnMock invocationOnMock) throws
Throwable {
- topicCreationDelayLatch.await();
+ doAnswer((Answer<Object>) invocationOnMock -> {
+ topicCreationDelayLatch.await();
- ((OpenLedgerCallback)
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
- return null;
- }
- }).when(mlFactoryMock).asyncOpen(matches(".*success.*"),
any(ManagedLedgerConfig.class),
- any(OpenLedgerCallback.class), any(Supplier.class), any());
+ ((OpenLedgerCallback)
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
+ return null;
+ }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+ .asyncOpen(matches(".*success.*"),
any(ManagedLedgerConfig.class),
+ any(OpenLedgerCallback.class), any(Supplier.class),
any());
// In a create producer timeout from client side we expect to see this
sequence of commands :
// 1. create producer
@@ -1069,7 +1035,7 @@ public class ServerCnxTest {
producerName, Collections.emptyMap(), false);
channel.writeInbound(createProducer1);
- ByteBuf closeProducer1 = Commands.newCloseProducer(1 /* producer id
*/, 2 /* request id */ );
+ ByteBuf closeProducer1 = Commands.newCloseProducer(1 /* producer id
*/, 2 /* request id */);
channel.writeInbound(closeProducer1);
ByteBuf createProducer2 = Commands.newProducer(successTopicName, 1 /*
producer id */, 3 /* request id */,
@@ -1126,12 +1092,13 @@ public class ServerCnxTest {
// Delay the topic creation in a deterministic way
CompletableFuture<Runnable> openFailedTopic = new
CompletableFuture<>();
doAnswer(invocationOnMock -> {
- openFailedTopic.complete(() -> {
- ((OpenLedgerCallback)
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
- });
+ openFailedTopic.complete(
+ () -> ((OpenLedgerCallback)
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock,
+ null));
return null;
- }).when(mlFactoryMock).asyncOpen(matches(".*fail.*"),
any(ManagedLedgerConfig.class),
- any(OpenLedgerCallback.class), any(Supplier.class), any());
+ }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+ .asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class),
+ any(OpenLedgerCallback.class), any(Supplier.class),
any());
// In a create producer timeout from client side we expect to see this
sequence of commands :
// 1. create a failure producer which will timeout creation after
100msec
@@ -1148,7 +1115,7 @@ public class ServerCnxTest {
producerName, Collections.emptyMap(), false);
channel.writeInbound(createProducer1);
- ByteBuf closeProducer = Commands.newCloseProducer(1 /* producer id */,
2 /* request id */ );
+ ByteBuf closeProducer = Commands.newCloseProducer(1 /* producer id */,
2 /* request id */);
channel.writeInbound(closeProducer);
ByteBuf createProducer2 = Commands.newProducer(successTopicName, 1 /*
producer id */, 3 /* request id */,
@@ -1196,13 +1163,14 @@ public class ServerCnxTest {
// Delay the topic creation in a deterministic way
CompletableFuture<Runnable> openTopicTask = new CompletableFuture<>();
doAnswer(invocationOnMock -> {
- openTopicTask.complete(() -> {
- ((OpenLedgerCallback)
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
- });
+ openTopicTask.complete(
+ () -> ((OpenLedgerCallback)
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock,
+ null));
return null;
- }).when(mlFactoryMock).asyncOpen(matches(".*success.*"),
any(ManagedLedgerConfig.class),
- any(OpenLedgerCallback.class), any(Supplier.class), any());
+ }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+ .asyncOpen(matches(".*success.*"),
any(ManagedLedgerConfig.class),
+ any(OpenLedgerCallback.class), any(Supplier.class),
any());
// In a subscribe timeout from client side we expect to see this
sequence of commands :
// 1. Subscribe
@@ -1271,22 +1239,22 @@ public class ServerCnxTest {
// Delay the topic creation in a deterministic way
CompletableFuture<Runnable> openTopicSuccess = new
CompletableFuture<>();
doAnswer(invocationOnMock -> {
- openTopicSuccess.complete(() -> {
- ((OpenLedgerCallback)
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
- });
+ openTopicSuccess.complete(
+ () -> ((OpenLedgerCallback)
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock,
+ null));
return null;
- }).when(mlFactoryMock).asyncOpen(matches(".*success.*"),
any(ManagedLedgerConfig.class),
- any(OpenLedgerCallback.class), any(Supplier.class), any());
+ }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+ .asyncOpen(matches(".*success.*"),
any(ManagedLedgerConfig.class),
+ any(OpenLedgerCallback.class), any(Supplier.class),
any());
CompletableFuture<Runnable> openTopicFail = new CompletableFuture<>();
doAnswer(invocationOnMock -> {
- openTopicFail.complete(() -> {
- ((OpenLedgerCallback) invocationOnMock.getArguments()[2])
- .openLedgerFailed(new ManagedLedgerException("Managed
ledger failure"), null);
- });
+ openTopicFail.complete(() -> ((OpenLedgerCallback)
invocationOnMock.getArguments()[2])
+ .openLedgerFailed(new ManagedLedgerException("Managed
ledger failure"), null));
return null;
- }).when(mlFactoryMock).asyncOpen(matches(".*fail.*"),
any(ManagedLedgerConfig.class),
- any(OpenLedgerCallback.class), any(Supplier.class), any());
+ }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+ .asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class),
+ any(OpenLedgerCallback.class), any(Supplier.class),
any());
// In a subscribe timeout from client side we expect to see this
sequence of commands :
// 1. Subscribe against failtopic which will fail after 100msec
@@ -1301,7 +1269,7 @@ public class ServerCnxTest {
"test" /* consumer name */, 0 /* avoid reseting cursor */);
channel.writeInbound(subscribe1);
- ByteBuf closeConsumer = Commands.newCloseConsumer(1 /* consumer id */,
2 /* request id */ );
+ ByteBuf closeConsumer = Commands.newCloseConsumer(1 /* consumer id */,
2 /* request id */);
channel.writeInbound(closeConsumer);
ByteBuf subscribe2 = Commands.newSubscribe(successTopicName, //
@@ -1352,8 +1320,8 @@ public class ServerCnxTest {
resetChannel();
setChannelConnected();
- doReturn(false).when(brokerService).isAuthenticationEnabled();
- doReturn(false).when(brokerService).isAuthorizationEnabled();
+ svcConfig.setAuthenticationEnabled(false);
+ svcConfig.setAuthorizationEnabled(false);
// test SUBSCRIBE on topic and cursor creation success
ByteBuf clientCommand = Commands.newSubscribe(successTopicName, //
successSubName, 1 /* consumer id */, 1 /* request id */,
SubType.Exclusive, 0,
@@ -1392,8 +1360,8 @@ public class ServerCnxTest {
resetChannel();
setChannelConnected();
setConnectionVersion(ProtocolVersion.v3.getValue());
- doReturn(false).when(brokerService).isAuthenticationEnabled();
- doReturn(false).when(brokerService).isAuthorizationEnabled();
+ svcConfig.setAuthenticationEnabled(false);
+ svcConfig.setAuthorizationEnabled(false);
// test SUBSCRIBE on topic and cursor creation success
ByteBuf clientCommand = Commands.newSubscribe(successTopicName, //
successSubName, 1 /* consumer id */, 1 /* request id */,
SubType.Exclusive, 0 /* priority */,
@@ -1420,11 +1388,12 @@ public class ServerCnxTest {
@Test(timeOut = 30000)
public void testSubscribeCommandWithAuthorizationPositive() throws
Exception {
AuthorizationService authorizationService =
mock(AuthorizationService.class);
-
doReturn(CompletableFuture.completedFuture(true)).when(authorizationService).allowTopicOperationAsync(Mockito.any(),
- Mockito.any(), Mockito.any(), Mockito.any());
+
doReturn(CompletableFuture.completedFuture(true)).when(authorizationService)
+ .allowTopicOperationAsync(Mockito.any(),
+ Mockito.any(), Mockito.any(), Mockito.any());
doReturn(authorizationService).when(brokerService).getAuthorizationService();
- doReturn(true).when(brokerService).isAuthenticationEnabled();
- doReturn(true).when(brokerService).isAuthorizationEnabled();
+ svcConfig.setAuthenticationEnabled(true);
+ svcConfig.setAuthorizationEnabled(true);
resetChannel();
setChannelConnected();
@@ -1442,18 +1411,20 @@ public class ServerCnxTest {
@Test(timeOut = 30000)
public void testSubscribeCommandWithAuthorizationNegative() throws
Exception {
AuthorizationService authorizationService =
mock(AuthorizationService.class);
-
doReturn(CompletableFuture.completedFuture(false)).when(authorizationService).allowTopicOperationAsync(Mockito.any(),
- Mockito.any(), Mockito.any(), Mockito.any());
+
doReturn(CompletableFuture.completedFuture(false)).when(authorizationService)
+ .allowTopicOperationAsync(Mockito.any(),
+ Mockito.any(), Mockito.any(), Mockito.any());
doReturn(authorizationService).when(brokerService).getAuthorizationService();
- doReturn(true).when(brokerService).isAuthenticationEnabled();
- doReturn(true).when(brokerService).isAuthorizationEnabled();
+ svcConfig.setAuthenticationEnabled(true);
+ svcConfig.setAuthorizationEnabled(true);
resetChannel();
setChannelConnected();
// test SUBSCRIBE on topic and cursor creation success
ByteBuf clientCommand = Commands.newSubscribe(successTopicName, //
- successSubName, 1 /* consumer id */, 1 /* request id */,
SubType.Exclusive, 0, "test" /* consumer name */, 0 /*avoid reseting cursor*/);
+ successSubName, 1 /* consumer id */, 1 /* request id */,
SubType.Exclusive, 0,
+ "test" /* consumer name */, 0 /*avoid reseting cursor*/);
channel.writeInbound(clientCommand);
assertTrue(getResponse() instanceof CommandError);
@@ -1467,15 +1438,16 @@ public class ServerCnxTest {
ByteBuf clientCommand = Commands.newSubscribe(successTopicName,
successSubName, 1 /* consumer id */,
1 /*
- * request id
- */, SubType.Exclusive, 0, "test" /* consumer name */, 0
/*avoid reseting cursor*/);
+ * request id
+ */, SubType.Exclusive, 0, "test" /* consumer name */, 0
/*avoid reseting cursor*/);
channel.writeInbound(clientCommand);
assertTrue(getResponse() instanceof CommandSuccess);
PositionImpl pos = new PositionImpl(0, 0);
- clientCommand = Commands.newAck(1 /* consumer id */,
pos.getLedgerId(), pos.getEntryId(), null, AckType.Individual,
- null, Collections.emptyMap(), -1);
+ clientCommand =
+ Commands.newAck(1 /* consumer id */, pos.getLedgerId(),
pos.getEntryId(), null, AckType.Individual,
+ null, Collections.emptyMap(), -1);
channel.writeInbound(clientCommand);
// verify nothing is sent out on the wire after ack
@@ -1522,12 +1494,14 @@ public class ServerCnxTest {
// add `clusterDispatchRate` otherwise there will be a NPE
//
`org.apache.pulsar.broker.service.AbstractTopic.updateNamespaceReplicatorDispatchRate`
policies.replicatorDispatchRate = new HashMap<>();
-
doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(namespaceResources)
-
.getPoliciesAsync(TopicName.get(encryptionRequiredTopicName).getNamespaceObject());
+ testPulsarServiceFactory.getPulsarResources().getNamespaceResources()
+
.createPolicies(TopicName.get(encryptionRequiredTopicName).getNamespaceObject(),
+ policies);
// test success case: encrypted producer can connect
- ByteBuf clientCommand =
Commands.newProducer(encryptionRequiredTopicName, 1 /* producer id */, 1 /*
request id */,
- "encrypted-producer", true, Collections.emptyMap(), false);
+ ByteBuf clientCommand =
+ Commands.newProducer(encryptionRequiredTopicName, 1 /*
producer id */, 1 /* request id */,
+ "encrypted-producer", true, Collections.emptyMap(),
false);
channel.writeInbound(clientCommand);
Object response = getResponse();
@@ -1558,12 +1532,14 @@ public class ServerCnxTest {
// add `clusterDispatchRate` otherwise there will be a NPE
//
`org.apache.pulsar.broker.service.AbstractTopic.updateNamespaceReplicatorDispatchRate`
policies.replicatorDispatchRate = new HashMap<>();
-
doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(namespaceResources)
-
.getPoliciesAsync(TopicName.get(encryptionRequiredTopicName).getNamespaceObject());
+ testPulsarServiceFactory.getPulsarResources().getNamespaceResources()
+
.createPolicies(TopicName.get(encryptionRequiredTopicName).getNamespaceObject(),
+ policies);
// test failure case: unencrypted producer cannot connect
- ByteBuf clientCommand =
Commands.newProducer(encryptionRequiredTopicName, 2 /* producer id */, 2 /*
request id */,
- "unencrypted-producer", false, Collections.emptyMap(), false);
+ ByteBuf clientCommand =
+ Commands.newProducer(encryptionRequiredTopicName, 2 /*
producer id */, 2 /* request id */,
+ "unencrypted-producer", false, Collections.emptyMap(),
false);
channel.writeInbound(clientCommand);
Object response = getResponse();
@@ -1598,12 +1574,14 @@ public class ServerCnxTest {
// add `clusterDispatchRate` otherwise there will be a NPE
//
`org.apache.pulsar.broker.service.AbstractTopic.updateNamespaceReplicatorDispatchRate`
policies.replicatorDispatchRate = new HashMap<>();
-
doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(namespaceResources)
-
.getPoliciesAsync(TopicName.get(encryptionRequiredTopicName).getNamespaceObject());
+ testPulsarServiceFactory.getPulsarResources().getNamespaceResources()
+
.createPolicies(TopicName.get(encryptionRequiredTopicName).getNamespaceObject(),
+ policies);
// test failure case: unencrypted producer cannot connect
- ByteBuf clientCommand =
Commands.newProducer(encryptionRequiredTopicName, 2 /* producer id */, 2 /*
request id */,
- "unencrypted-producer", false, Collections.emptyMap(), false);
+ ByteBuf clientCommand =
+ Commands.newProducer(encryptionRequiredTopicName, 2 /*
producer id */, 2 /* request id */,
+ "unencrypted-producer", false, Collections.emptyMap(),
false);
channel.writeInbound(clientCommand);
Object response = getResponse();
@@ -1636,11 +1614,13 @@ public class ServerCnxTest {
// add `clusterDispatchRate` otherwise there will be a NPE
//
`org.apache.pulsar.broker.service.AbstractTopic.updateNamespaceReplicatorDispatchRate`
policies.replicatorDispatchRate = new HashMap<>();
-
doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(namespaceResources)
-
.getPoliciesAsync(TopicName.get(encryptionRequiredTopicName).getNamespaceObject());
+ testPulsarServiceFactory.getPulsarResources().getNamespaceResources()
+
.createPolicies(TopicName.get(encryptionRequiredTopicName).getNamespaceObject(),
+ policies);
- ByteBuf clientCommand =
Commands.newProducer(encryptionRequiredTopicName, 1 /* producer id */, 1 /*
request id */,
- "prod-name", true, Collections.emptyMap(), false);
+ ByteBuf clientCommand =
+ Commands.newProducer(encryptionRequiredTopicName, 1 /*
producer id */, 1 /* request id */,
+ "prod-name", true, Collections.emptyMap(), false);
channel.writeInbound(clientCommand);
assertTrue(getResponse() instanceof CommandProducerSuccess);
@@ -1680,11 +1660,13 @@ public class ServerCnxTest {
// add `clusterDispatchRate` otherwise there will be a NPE
//
`org.apache.pulsar.broker.service.AbstractTopic.updateNamespaceReplicatorDispatchRate`
policies.replicatorDispatchRate = new HashMap<>();
-
doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(namespaceResources)
-
.getPoliciesAsync(TopicName.get(encryptionRequiredTopicName).getNamespaceObject());
+ testPulsarServiceFactory.getPulsarResources().getNamespaceResources()
+
.createPolicies(TopicName.get(encryptionRequiredTopicName).getNamespaceObject(),
+ policies);
- ByteBuf clientCommand =
Commands.newProducer(encryptionRequiredTopicName, 1 /* producer id */, 1 /*
request id */,
- "prod-name", true, Collections.emptyMap(), false);
+ ByteBuf clientCommand =
+ Commands.newProducer(encryptionRequiredTopicName, 1 /*
producer id */, 1 /* request id */,
+ "prod-name", true, Collections.emptyMap(), false);
channel.writeInbound(clientCommand);
assertTrue(getResponse() instanceof CommandProducerSuccess);
@@ -1751,18 +1733,16 @@ public class ServerCnxTest {
private void setupMLAsyncCallbackMocks() {
ledgerMock = mock(ManagedLedger.class);
cursorMock = mock(ManagedCursor.class);
- doReturn(new ArrayList<Object>()).when(ledgerMock).getCursors();
+ doReturn(new ArrayList<>()).when(ledgerMock).getCursors();
// call openLedgerComplete with ledgerMock on ML factory asyncOpen
- doAnswer(new Answer<Object>() {
- @Override
- public Object answer(InvocationOnMock invocationOnMock) throws
Throwable {
- Thread.sleep(300);
- ((OpenLedgerCallback)
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
- return null;
- }
- }).when(mlFactoryMock).asyncOpen(matches(".*success.*"),
any(ManagedLedgerConfig.class),
- any(OpenLedgerCallback.class), any(Supplier.class), any());
+ doAnswer((Answer<Object>) invocationOnMock -> {
+ Thread.sleep(300);
+ ((OpenLedgerCallback)
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
+ return null;
+ }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+ .asyncOpen(matches(".*success.*"),
any(ManagedLedgerConfig.class),
+ any(OpenLedgerCallback.class), any(Supplier.class),
any());
// call openLedgerFailed on ML factory asyncOpen
doAnswer((Answer<Object>) invocationOnMock -> {
@@ -1771,19 +1751,17 @@ public class ServerCnxTest {
.openLedgerFailed(new ManagedLedgerException("Managed
ledger failure"), null)).start();
return null;
- }).when(mlFactoryMock).asyncOpen(matches(".*fail.*"),
any(ManagedLedgerConfig.class),
- any(OpenLedgerCallback.class), any(Supplier.class), any());
+ }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+ .asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class),
+ any(OpenLedgerCallback.class), any(Supplier.class),
any());
// call addComplete on ledger asyncAddEntry
- doAnswer(new Answer<Object>() {
- @Override
- public Object answer(InvocationOnMock invocationOnMock) throws
Throwable {
- ((AddEntryCallback)
invocationOnMock.getArguments()[1]).addComplete(
- new PositionImpl(-1, -1),
- null,
- invocationOnMock.getArguments()[2]);
- return null;
- }
+ doAnswer((Answer<Object>) invocationOnMock -> {
+ ((AddEntryCallback)
invocationOnMock.getArguments()[1]).addComplete(
+ new PositionImpl(-1, -1),
+ null,
+ invocationOnMock.getArguments()[2]);
+ return null;
}).when(ledgerMock).asyncAddEntry(any(ByteBuf.class),
any(AddEntryCallback.class), any());
doAnswer((Answer<Object>) invocationOnMock ->
true).when(cursorMock).isDurable();
@@ -1792,29 +1770,34 @@ public class ServerCnxTest {
Thread.sleep(300);
((OpenCursorCallback)
invocationOnMock.getArguments()[3]).openCursorComplete(cursorMock, null);
return null;
- }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"),
any(InitialPosition.class), any(OpenCursorCallback.class), any());
+ }).when(ledgerMock)
+ .asyncOpenCursor(matches(".*success.*"),
any(InitialPosition.class), any(OpenCursorCallback.class),
+ any());
doAnswer((Answer<Object>) invocationOnMock -> {
Thread.sleep(300);
((OpenCursorCallback)
invocationOnMock.getArguments()[4]).openCursorComplete(cursorMock, null);
return null;
- }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"),
any(InitialPosition.class), any(Map.class), any(Map.class),
- any(OpenCursorCallback.class), any());
+ }).when(ledgerMock)
+ .asyncOpenCursor(matches(".*success.*"),
any(InitialPosition.class), any(Map.class), any(Map.class),
+ any(OpenCursorCallback.class), any());
doAnswer((Answer<Object>) invocationOnMock -> {
Thread.sleep(300);
((OpenCursorCallback) invocationOnMock.getArguments()[3])
.openCursorFailed(new ManagedLedgerException("Managed
ledger failure"), null);
return null;
- }).when(ledgerMock).asyncOpenCursor(matches(".*fail.*"),
any(InitialPosition.class), any(OpenCursorCallback.class), any());
+ }).when(ledgerMock)
+ .asyncOpenCursor(matches(".*fail.*"),
any(InitialPosition.class), any(OpenCursorCallback.class), any());
doAnswer((Answer<Object>) invocationOnMock -> {
Thread.sleep(300);
((OpenCursorCallback) invocationOnMock.getArguments()[3])
.openCursorFailed(new ManagedLedgerException("Managed
ledger failure"), null);
return null;
- }).when(ledgerMock).asyncOpenCursor(matches(".*fail.*"),
any(InitialPosition.class), any(Map.class), any(Map.class),
- any(OpenCursorCallback.class), any());
+ }).when(ledgerMock)
+ .asyncOpenCursor(matches(".*fail.*"),
any(InitialPosition.class), any(Map.class), any(Map.class),
+ any(OpenCursorCallback.class), any());
doAnswer((Answer<Object>) invocationOnMock -> {
((DeleteCursorCallback)
invocationOnMock.getArguments()[1]).deleteCursorComplete(null);
@@ -2085,7 +2068,7 @@ public class ServerCnxTest {
}
@Test
- public void testNeverDelayConsumerFutureWhenNotFail() throws Exception{
+ public void testNeverDelayConsumerFutureWhenNotFail() throws Exception {
// Mock ServerCnx.field: consumers
ConcurrentLongHashMap.Builder mapBuilder =
Mockito.mock(ConcurrentLongHashMap.Builder.class);
Mockito.when(mapBuilder.expectedItems(Mockito.anyInt())).thenReturn(mapBuilder);
@@ -2100,12 +2083,12 @@ public class ServerCnxTest {
// case2: exists existingConsumerFuture, delay complete after execute
'isDone()' many times
// Why is the design so complicated, see:
https://github.com/apache/pulsar/pull/15051
// Try a delay of 3 stages. The simulation is successful after
repeated judgments.
- for(AtomicInteger futureWillDoneAfterDelayTimes = new AtomicInteger(1);
-
futureWillDoneAfterDelayTimes.intValue() <= 3;
-
futureWillDoneAfterDelayTimes.incrementAndGet()){
+ for (AtomicInteger futureWillDoneAfterDelayTimes = new
AtomicInteger(1);
+ futureWillDoneAfterDelayTimes.intValue() <= 3;
+ futureWillDoneAfterDelayTimes.incrementAndGet()) {
final AtomicInteger futureCallTimes = new AtomicInteger();
final Consumer mockConsumer = Mockito.mock(Consumer.class);
- CompletableFuture existingConsumerFuture = new
CompletableFuture<Consumer>(){
+ CompletableFuture existingConsumerFuture = new
CompletableFuture<Consumer>() {
private boolean complete;
@@ -2121,27 +2104,27 @@ public class ServerCnxTest {
// if trig "getNow()", then complete
@Override
- public Consumer get(){
+ public Consumer get() {
complete = true;
return mockConsumer;
}
// if trig "get()", then complete
@Override
- public Consumer get(long timeout, TimeUnit unit){
+ public Consumer get(long timeout, TimeUnit unit) {
complete = true;
return mockConsumer;
}
// if trig "get()", then complete
@Override
- public Consumer getNow(Consumer ifAbsent){
+ public Consumer getNow(Consumer ifAbsent) {
complete = true;
return mockConsumer;
}
// never fail
- public boolean isCompletedExceptionally(){
+ public boolean isCompletedExceptionally() {
return false;
}
};
@@ -2155,8 +2138,8 @@ public class ServerCnxTest {
resetChannel();
setChannelConnected();
// auth check disable
- doReturn(false).when(brokerService).isAuthenticationEnabled();
- doReturn(false).when(brokerService).isAuthorizationEnabled();
+ svcConfig.setAuthenticationEnabled(false);
+ svcConfig.setAuthorizationEnabled(false);
// do subscribe
ByteBuf clientCommand =
Commands.newSubscribe(successTopicName, //
successSubName, 1 /* consumer id */, 1 /* request id
*/, SubType.Exclusive, 0,
@@ -2200,8 +2183,8 @@ public class ServerCnxTest {
resetChannel();
setChannelConnected();
// auth check disable
- doReturn(false).when(brokerService).isAuthenticationEnabled();
- doReturn(false).when(brokerService).isAuthorizationEnabled();
+ svcConfig.setAuthenticationEnabled(false);
+ svcConfig.setAuthorizationEnabled(false);
// do subscribe
ByteBuf clientCommand = Commands.newSubscribe(successTopicName, //
successSubName, 1 /* consumer id */, 1 /* request id */,
SubType.Exclusive, 0,
@@ -2570,7 +2553,8 @@ public class ServerCnxTest {
setChannelConnected();
Topic topic = mock(Topic.class);
doReturn(CompletableFuture.completedFuture(null)).when(topic).endTxn(any(TxnID.class),
anyInt(), anyLong());
-
doReturn(CompletableFuture.completedFuture(Optional.of(topic))).when(brokerService).getTopicIfExists(any(String.class));
+
doReturn(CompletableFuture.completedFuture(Optional.of(topic))).when(brokerService)
+ .getTopicIfExists(any(String.class));
ByteBuf clientCommand = Commands.newEndTxnOnPartition(89L, 1L, 12L,
successTopicName, TxnAction.COMMIT, 1L);
channel.writeInbound(clientCommand);
@@ -2596,8 +2580,10 @@ public class ServerCnxTest {
resetChannel();
setChannelConnected();
Topic topic = mock(Topic.class);
- doReturn(CompletableFuture.failedFuture(new RuntimeException("server
error"))).when(topic).endTxn(any(TxnID.class), anyInt(), anyLong());
-
doReturn(CompletableFuture.completedFuture(Optional.of(topic))).when(brokerService).getTopicIfExists(any(String.class));
+ doReturn(CompletableFuture.failedFuture(new RuntimeException("server
error"))).when(topic)
+ .endTxn(any(TxnID.class), anyInt(), anyLong());
+
doReturn(CompletableFuture.completedFuture(Optional.of(topic))).when(brokerService)
+ .getTopicIfExists(any(String.class));
ByteBuf clientCommand = Commands.newEndTxnOnPartition(89L, 1L, 12L,
successTopicName, TxnAction.COMMIT, 1L);
channel.writeInbound(clientCommand);
@@ -2623,11 +2609,13 @@ public class ServerCnxTest {
resetChannel();
setChannelConnected();
Topic topic = mock(Topic.class);
- final org.apache.pulsar.broker.service.Subscription sub =
mock(org.apache.pulsar.broker.service.Subscription.class);
+ final org.apache.pulsar.broker.service.Subscription sub =
+ mock(org.apache.pulsar.broker.service.Subscription.class);
doReturn(sub).when(topic).getSubscription(any());
doReturn(CompletableFuture.completedFuture(null))
.when(sub).endTxn(anyLong(), anyLong(), anyInt(), anyLong());
-
doReturn(CompletableFuture.completedFuture(Optional.of(topic))).when(brokerService).getTopicIfExists(any(String.class));
+
doReturn(CompletableFuture.completedFuture(Optional.of(topic))).when(brokerService)
+ .getTopicIfExists(any(String.class));
ByteBuf clientCommand = Commands.newEndTxnOnSubscription(89L, 1L, 12L,
successTopicName, successSubName, TxnAction.COMMIT, 1L);
@@ -2656,11 +2644,13 @@ public class ServerCnxTest {
setChannelConnected();
Topic topic = mock(Topic.class);
- final org.apache.pulsar.broker.service.Subscription sub =
mock(org.apache.pulsar.broker.service.Subscription.class);
+ final org.apache.pulsar.broker.service.Subscription sub =
+ mock(org.apache.pulsar.broker.service.Subscription.class);
doReturn(sub).when(topic).getSubscription(any());
doReturn(CompletableFuture.failedFuture(new RuntimeException("server
error")))
.when(sub).endTxn(anyLong(), anyLong(), anyInt(), anyLong());
-
doReturn(CompletableFuture.completedFuture(Optional.of(topic))).when(brokerService).getTopicIfExists(any(String.class));
+
doReturn(CompletableFuture.completedFuture(Optional.of(topic))).when(brokerService)
+ .getTopicIfExists(any(String.class));
ByteBuf clientCommand = Commands.newEndTxnOnSubscription(89L, 1L, 12L,
successTopicName, successSubName, TxnAction.COMMIT, 1L);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
index e869d71e80c..9f913795b2a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
@@ -27,7 +27,6 @@ import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@@ -113,7 +112,7 @@ public class PersistentSubscriptionTest {
executor =
OrderedExecutor.newBuilder().numThreads(1).name("persistent-subscription-test").build();
eventLoopGroup = new NioEventLoopGroup();
- ServiceConfiguration svcConfig = spy(ServiceConfiguration.class);
+ ServiceConfiguration svcConfig = new ServiceConfiguration();
svcConfig.setBrokerShutdownTimeoutMs(0L);
svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
svcConfig.setTransactionCoordinatorEnabled(true);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/ProcessHandlerFilterTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/ProcessHandlerFilterTest.java
index cc427bcfd6a..8d9c514c4f6 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/ProcessHandlerFilterTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/ProcessHandlerFilterTest.java
@@ -40,11 +40,11 @@ public class ProcessHandlerFilterTest {
BrokerInterceptor spyInterceptor =
Mockito.spy(BrokerInterceptor.class);
HttpServletRequest mockHttpServletRequest =
Mockito.mock(HttpServletRequest.class);
HttpServletResponse mockHttpServletResponse =
Mockito.mock(HttpServletResponse.class);
- ServiceConfiguration mockConfig =
Mockito.mock(ServiceConfiguration.class);
+ ServiceConfiguration config = new ServiceConfiguration();
FilterChain mockFilterChain = Mockito.mock(FilterChain.class);
Mockito.doReturn(spyInterceptor).when(mockPulsarService).getBrokerInterceptor();
- Mockito.doReturn(mockConfig).when(mockPulsarService).getConfig();
- Mockito.doReturn(Sets.newHashSet("Interceptor1",
"Interceptor2")).when(mockConfig).getBrokerInterceptors();
+ Mockito.doReturn(config).when(mockPulsarService).getConfig();
+ config.setBrokerInterceptors(Sets.newHashSet("Interceptor1",
"Interceptor2"));
ProcessHandlerFilter processHandlerFilter = new
ProcessHandlerFilter(mockPulsarService);
processHandlerFilter.doFilter(mockHttpServletRequest,
mockHttpServletResponse, mockFilterChain);
Mockito.verify(spyInterceptor).onFilter(mockHttpServletRequest,
mockHttpServletResponse, mockFilterChain);
@@ -55,11 +55,11 @@ public class ProcessHandlerFilterTest {
PulsarService mockPulsarService = Mockito.mock(PulsarService.class);
BrokerInterceptor spyInterceptor =
Mockito.mock(BrokerInterceptor.class);
HttpServletResponse mockHttpServletResponse =
Mockito.mock(HttpServletResponse.class);
- ServiceConfiguration mockConfig =
Mockito.mock(ServiceConfiguration.class);
+ ServiceConfiguration config = new ServiceConfiguration();
FilterChain spyFilterChain = Mockito.spy(FilterChain.class);
Mockito.doReturn(spyInterceptor).when(mockPulsarService).getBrokerInterceptor();
- Mockito.doReturn(mockConfig).when(mockPulsarService).getConfig();
- Mockito.doReturn(new
HashSet<>()).when(mockConfig).getBrokerInterceptors();
+ Mockito.doReturn(config).when(mockPulsarService).getConfig();
+ config.setBrokerInterceptors(new HashSet<>());
// empty interceptor list
HttpServletRequest mockHttpServletRequest =
Mockito.mock(HttpServletRequest.class);
ProcessHandlerFilter processHandlerFilter = new
ProcessHandlerFilter(mockPulsarService);
@@ -67,7 +67,7 @@ public class ProcessHandlerFilterTest {
Mockito.verify(spyFilterChain).doFilter(mockHttpServletRequest,
mockHttpServletResponse);
Mockito.clearInvocations(spyFilterChain);
// request has MULTIPART_FORM_DATA content-type
-
Mockito.doReturn(Sets.newHashSet("Interceptor1","Interceptor2")).when(mockConfig).getBrokerInterceptors();
+
config.setBrokerInterceptors(Sets.newHashSet("Interceptor1","Interceptor2"));
HttpServletRequest mockHttpServletRequest2 =
Mockito.mock(HttpServletRequest.class);
Mockito.doReturn(MediaType.MULTIPART_FORM_DATA).when(mockHttpServletRequest2).getContentType();
ProcessHandlerFilter processHandlerFilter2 = new
ProcessHandlerFilter(mockPulsarService);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java
index f5c3c1de1c2..6b3b05405ba 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java
@@ -19,12 +19,10 @@
package org.apache.pulsar.client.api;
import static
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
-import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
-
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@@ -81,7 +79,7 @@ public class ClientDeduplicationFailureTest {
bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
bkEnsemble.start();
- config = spy(ServiceConfiguration.class);
+ config = new ServiceConfiguration();
config.setClusterName("use");
config.setWebServicePort(Optional.of(0));
config.setMetadataStoreUrl("zk:127.0.0.1:" +
bkEnsemble.getZookeeperPort());