This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d9a71d7ff4e [improve][test] Refactor TestPulsarService to
PulsarTestContext and add support for starting (#19337)
d9a71d7ff4e is described below
commit d9a71d7ff4e9b251ed51d3e3154b992dc58f6c74
Author: Lari Hotari <[email protected]>
AuthorDate: Mon Jan 30 20:51:39 2023 +0200
[improve][test] Refactor TestPulsarService to PulsarTestContext and add
support for starting (#19337)
---
.../apache/pulsar/broker/TestPulsarService.java | 501 ---------------------
.../OwnerShipForCurrentServerTestBase.java | 180 +-------
.../broker/service/MessageCumulativeAckTest.java | 16 +-
.../PersistentDispatcherFailoverConsumerTest.java | 38 +-
.../pulsar/broker/service/PersistentTopicTest.java | 208 +++++----
.../broker/service/ServerCnxAuthorizationTest.java | 30 +-
.../pulsar/broker/service/ServerCnxTest.java | 46 +-
...herFailoverConsumerStreamingDispatcherTest.java | 2 +-
.../PersistentTopicStreamingDispatcherTest.java | 2 +-
.../testcontext/AbstractTestPulsarService.java | 88 ++++
.../testcontext/MockBookKeeperClientFactory.java | 60 +++
.../NonClosableMockBookKeeper.java} | 35 +-
.../broker/testcontext/NonClosingProxyHandler.java | 73 +++
.../testcontext/NonStartableTestPulsarService.java | 196 ++++++++
.../broker/testcontext/PulsarTestContext.java | 411 +++++++++++++++++
.../pulsar/broker/testcontext/SpyConfig.java | 83 ++++
.../testcontext/StartableTestPulsarService.java | 37 ++
17 files changed, 1180 insertions(+), 826 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/TestPulsarService.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/TestPulsarService.java
deleted file mode 100644
index 95667f95a33..00000000000
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/TestPulsarService.java
+++ /dev/null
@@ -1,501 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.pulsar.broker;
-
-import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
-import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgsRecordingInvocations;
-import static org.mockito.Mockito.mock;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import java.io.IOException;
-import java.time.Duration;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.function.Function;
-import java.util.function.Supplier;
-import lombok.AccessLevel;
-import lombok.Builder;
-import lombok.Getter;
-import lombok.Singular;
-import lombok.ToString;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.PulsarMockBookKeeper;
-import org.apache.bookkeeper.common.util.OrderedExecutor;
-import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
-import org.apache.bookkeeper.stats.NullStatsProvider;
-import org.apache.bookkeeper.stats.StatsProvider;
-import org.apache.pulsar.broker.namespace.NamespaceService;
-import org.apache.pulsar.broker.resources.NamespaceResources;
-import org.apache.pulsar.broker.resources.PulsarResources;
-import org.apache.pulsar.broker.resources.TopicResources;
-import org.apache.pulsar.broker.service.BrokerService;
-import org.apache.pulsar.broker.service.PulsarMetadataEventSynchronizer;
-import org.apache.pulsar.broker.service.ServerCnx;
-import org.apache.pulsar.broker.service.schema.DefaultSchemaRegistryService;
-import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
-import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.util.GracefulExecutorServicesShutdown;
-import org.apache.pulsar.compaction.Compactor;
-import org.apache.pulsar.metadata.api.MetadataStore;
-import org.apache.pulsar.metadata.api.MetadataStoreConfig;
-import org.apache.pulsar.metadata.api.MetadataStoreException;
-import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
-import org.apache.pulsar.metadata.impl.MetadataStoreFactoryImpl;
-import org.jetbrains.annotations.NotNull;
-
-/**
- * Subclass of PulsarService that is used for some tests.
- * This was written as a replacement for the previous Mockito Spy over
PulsarService solution which caused
- * a flaky test issue https://github.com/apache/pulsar/issues/13620.
- */
-
-public class TestPulsarService extends PulsarService {
-
-
- @Slf4j
- @ToString
- @Getter
- @Builder
- public static class Factory implements AutoCloseable {
- private final ServiceConfiguration config;
- private final MetadataStoreExtended localMetadataStore;
- private final MetadataStoreExtended configurationMetadataStore;
- private final PulsarResources pulsarResources;
-
- private final OrderedExecutor executor;
-
- private final EventLoopGroup eventLoopGroup;
-
- private final ManagedLedgerStorage managedLedgerClientFactory;
-
- private final boolean useSpies;
-
- private final PulsarService pulsarService;
-
- private final Compactor compactor;
-
- private final BrokerService brokerService;
-
- @Getter(AccessLevel.NONE)
- @Singular
- private final List<AutoCloseable> cleanupFunctions;
-
- public ManagedLedgerFactory getManagedLedgerFactory() {
- return managedLedgerClientFactory.getManagedLedgerFactory();
- }
-
- public static FactoryBuilder builder() {
- return new CustomFactoryBuilder();
- }
-
- public void close() throws Exception {
- pulsarService.getBrokerService().close();
- pulsarService.close();
- GracefulExecutorServicesShutdown.initiate()
- .timeout(Duration.ZERO)
- .shutdown(executor)
- .handle().get();
- eventLoopGroup.shutdownGracefully().get();
- if (localMetadataStore != configurationMetadataStore) {
- localMetadataStore.close();
- configurationMetadataStore.close();
- } else {
- localMetadataStore.close();
- }
- for (AutoCloseable cleanup : cleanupFunctions) {
- try {
- cleanup.close();
- } catch (Exception e) {
- log.error("Failure in calling cleanup function", e);
- }
- }
- }
-
- public ServerCnx createServerCnxSpy() {
- return
spyWithClassAndConstructorArgsRecordingInvocations(ServerCnx.class,
- getPulsarService());
- }
-
- public static class FactoryBuilder {
- protected boolean useTestPulsarResources = false;
- protected MetadataStore pulsarResourcesMetadataStore;
- protected Function<PulsarService, BrokerService>
brokerServiceFunction;
-
- public FactoryBuilder useTestPulsarResources() {
- useTestPulsarResources = true;
- return this;
- }
-
- public FactoryBuilder useTestPulsarResources(MetadataStore
metadataStore) {
- useTestPulsarResources = true;
- pulsarResourcesMetadataStore = metadataStore;
- return this;
- }
-
- public FactoryBuilder managedLedgerClients(BookKeeper
bookKeeperClient,
- ManagedLedgerFactory
managedLedgerFactory) {
- return managedLedgerClientFactory(
-
Factory.createManagedLedgerClientFactory(bookKeeperClient,
managedLedgerFactory));
- }
-
- public FactoryBuilder brokerServiceFunction(
- Function<PulsarService, BrokerService>
brokerServiceFunction) {
- this.brokerServiceFunction = brokerServiceFunction;
- return this;
- }
- }
-
- private static class CustomFactoryBuilder extends
Factory.FactoryBuilder {
-
- @Override
- public Factory build() {
- initializeDefaults();
- return super.build();
- }
-
- private void initializeDefaults() {
- try {
- if (super.managedLedgerClientFactory == null) {
- if (super.executor == null) {
- super.executor =
OrderedExecutor.newBuilder().numThreads(1)
-
.name(TestPulsarService.class.getSimpleName() + "-executor").build();
- }
- NonClosableMockBookKeeper mockBookKeeper;
- if (super.useSpies) {
- mockBookKeeper =
-
spyWithClassAndConstructorArgs(NonClosableMockBookKeeper.class, super.executor);
- } else {
- mockBookKeeper = new
NonClosableMockBookKeeper(super.executor);
- }
- cleanupFunction(() -> mockBookKeeper.reallyShutdown());
- ManagedLedgerFactory mlFactoryMock =
mock(ManagedLedgerFactory.class);
-
- managedLedgerClientFactory(
-
Factory.createManagedLedgerClientFactory(mockBookKeeper, mlFactoryMock));
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- if (super.config == null) {
- ServiceConfiguration svcConfig = new
ServiceConfiguration();
- svcConfig.setBrokerShutdownTimeoutMs(0L);
-
svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
- svcConfig.setClusterName("pulsar-cluster");
- config(svcConfig);
- }
- if (super.localMetadataStore == null ||
super.configurationMetadataStore == null) {
- try {
- MetadataStoreExtended store =
MetadataStoreFactoryImpl.createExtended("memory:local",
- MetadataStoreConfig.builder().build());
- if (super.localMetadataStore == null) {
- localMetadataStore(store);
- }
- if (super.configurationMetadataStore == null) {
- configurationMetadataStore(store);
- }
- } catch (MetadataStoreException e) {
- throw new RuntimeException(e);
- }
- }
- if (super.pulsarResources == null) {
- if (useTestPulsarResources) {
- MetadataStore metadataStore =
pulsarResourcesMetadataStore;
- if (metadataStore == null) {
- metadataStore = super.configurationMetadataStore;
- }
- NamespaceResources nsr =
-
spyWithClassAndConstructorArgs(NamespaceResources.class, metadataStore, 30);
- TopicResources tsr =
spyWithClassAndConstructorArgs(TopicResources.class, metadataStore);
- if (!super.useSpies) {
- pulsarResources(
- new
TestPulsarResources(super.localMetadataStore, super.configurationMetadataStore,
- tsr, nsr));
- } else {
- pulsarResources(
-
spyWithClassAndConstructorArgs(TestPulsarResources.class,
super.localMetadataStore,
- super.configurationMetadataStore,
tsr, nsr));
- }
- } else {
- if (!super.useSpies) {
- pulsarResources(
- new
PulsarResources(super.localMetadataStore, super.configurationMetadataStore));
- } else {
- pulsarResources(
-
spyWithClassAndConstructorArgs(PulsarResources.class, super.localMetadataStore,
- super.configurationMetadataStore));
- }
- }
- }
- if (super.brokerServiceFunction == null) {
- if (super.brokerService == null) {
- if (super.eventLoopGroup == null) {
- super.eventLoopGroup = new NioEventLoopGroup();
- }
- brokerServiceFunction(pulsarService -> {
- try {
- if (!super.useSpies) {
- return new
TestBrokerService(pulsarService, super.eventLoopGroup);
- } else {
- return
spyWithClassAndConstructorArgs(TestBrokerService.class, pulsarService,
- super.eventLoopGroup);
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- });
- } else {
- brokerServiceFunction(pulsarService ->
super.brokerService);
- }
- }
- if (!super.useSpies) {
- pulsarService(new TestPulsarService(super.config,
super.localMetadataStore,
- super.configurationMetadataStore,
super.pulsarResources, super.managedLedgerClientFactory,
- super.brokerServiceFunction, super.executor,
super.compactor));
- } else {
-
pulsarService(spyWithClassAndConstructorArgs(TestPulsarService.class,
super.config,
- super.localMetadataStore,
super.configurationMetadataStore, super.pulsarResources,
- super.managedLedgerClientFactory,
super.brokerServiceFunction, super.executor,
- super.compactor));
- }
- if (super.brokerService == null) {
- brokerService(super.pulsarService.getBrokerService());
- }
- }
- }
-
- @NotNull
- private static ManagedLedgerStorage
createManagedLedgerClientFactory(BookKeeper bookKeeperClient,
-
ManagedLedgerFactory managedLedgerFactory) {
- return new ManagedLedgerStorage() {
-
- @Override
- public void initialize(ServiceConfiguration conf,
MetadataStoreExtended metadataStore,
- BookKeeperClientFactory
bookkeeperProvider, EventLoopGroup eventLoopGroup)
- throws Exception {
-
- }
-
- @Override
- public ManagedLedgerFactory getManagedLedgerFactory() {
- return managedLedgerFactory;
- }
-
- @Override
- public StatsProvider getStatsProvider() {
- return new NullStatsProvider();
- }
-
- @Override
- public BookKeeper getBookKeeperClient() {
- return bookKeeperClient;
- }
-
- @Override
- public void close() throws IOException {
-
- }
- };
- }
- }
-
- private static class TestPulsarResources extends PulsarResources {
-
- private final TopicResources topicResources;
- private final NamespaceResources namespaceResources;
-
- public TestPulsarResources(MetadataStore localMetadataStore,
MetadataStore configurationMetadataStore,
- TopicResources topicResources,
NamespaceResources namespaceResources) {
- super(localMetadataStore, configurationMetadataStore);
- this.topicResources = topicResources;
- this.namespaceResources = namespaceResources;
- }
-
- @Override
- public TopicResources getTopicResources() {
- return topicResources;
- }
-
- @Override
- public NamespaceResources getNamespaceResources() {
- return namespaceResources;
- }
- }
-
- private static class TestBrokerService extends BrokerService {
-
- TestBrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup)
throws Exception {
- super(pulsar, eventLoopGroup);
- }
-
- @Override
- protected CompletableFuture<Map<String, String>>
fetchTopicPropertiesAsync(TopicName topicName) {
- return CompletableFuture.completedFuture(Collections.emptyMap());
- }
- }
-
- // Prevent the MockBookKeeper instance from being closed when the broker
is restarted within a test
- private static class NonClosableMockBookKeeper extends
PulsarMockBookKeeper {
-
- public NonClosableMockBookKeeper(OrderedExecutor executor) throws
Exception {
- super(executor);
- }
-
- @Override
- public void close() {
- // no-op
- }
-
- @Override
- public void shutdown() {
- // no-op
- }
-
- public void reallyShutdown() {
- super.shutdown();
- }
- }
-
- private final MetadataStoreExtended localMetadataStore;
- private final MetadataStoreExtended configurationMetadataStore;
- private final PulsarResources pulsarResources;
- private final ManagedLedgerStorage managedLedgerClientFactory;
- private final BrokerService brokerService;
-
- private final OrderedExecutor executor;
-
- private final Compactor compactor;
-
- private final SchemaRegistryService schemaRegistryService;
-
- private final PulsarClient pulsarClient;
-
- private final NamespaceService namespaceService;
-
- protected TestPulsarService(ServiceConfiguration config,
MetadataStoreExtended localMetadataStore,
- MetadataStoreExtended
configurationMetadataStore, PulsarResources pulsarResources,
- ManagedLedgerStorage
managedLedgerClientFactory,
- Function<PulsarService, BrokerService>
brokerServiceFunction, OrderedExecutor executor,
- Compactor compactor) {
- super(config);
- this.localMetadataStore = localMetadataStore;
- this.configurationMetadataStore = configurationMetadataStore;
- this.pulsarResources = pulsarResources;
- this.managedLedgerClientFactory = managedLedgerClientFactory;
- this.brokerService = brokerServiceFunction.apply(this);
- this.executor = executor;
- this.compactor = compactor;
- this.schemaRegistryService =
spyWithClassAndConstructorArgs(DefaultSchemaRegistryService.class);
- this.pulsarClient = mock(PulsarClientImpl.class);
- this.namespaceService = mock(NamespaceService.class);
- try {
- startNamespaceService();
- } catch (PulsarServerException e) {
- throw new RuntimeException(e);
- }
- }
-
-
- @Override
- public Supplier<NamespaceService> getNamespaceServiceProvider() throws
PulsarServerException {
- return () -> namespaceService;
- }
-
- @Override
- public synchronized PulsarClient getClient() throws PulsarServerException {
- return pulsarClient;
- }
-
- @Override
- public SchemaRegistryService getSchemaRegistryService() {
- return schemaRegistryService;
- }
-
- @Override
- public MetadataStore
createConfigurationMetadataStore(PulsarMetadataEventSynchronizer synchronizer)
- throws MetadataStoreException {
- return configurationMetadataStore;
- }
-
- @Override
- public MetadataStoreExtended
createLocalMetadataStore(PulsarMetadataEventSynchronizer synchronizer)
- throws MetadataStoreException, PulsarServerException {
- return localMetadataStore;
- }
-
- @Override
- public PulsarResources getPulsarResources() {
- return pulsarResources;
- }
-
- public BrokerService getBrokerService() {
- return brokerService;
- }
-
- @Override
- public Compactor getCompactor() throws PulsarServerException {
- if (compactor != null) {
- return compactor;
- } else {
- return super.getCompactor();
- }
- }
-
- @Override
- public MetadataStore getConfigurationMetadataStore() {
- return configurationMetadataStore;
- }
-
- @Override
- public MetadataStoreExtended getLocalMetadataStore() {
- return localMetadataStore;
- }
-
- @Override
- public ManagedLedgerStorage getManagedLedgerClientFactory() {
- return managedLedgerClientFactory;
- }
-
- @Override
- public OrderedExecutor getOrderedExecutor() {
- return executor;
- }
-
- @Override
- protected PulsarResources newPulsarResources() {
- return pulsarResources;
- }
-
- @Override
- protected ManagedLedgerStorage newManagedLedgerClientFactory() throws
Exception {
- return managedLedgerClientFactory;
- }
-
- @Override
- protected BrokerService newBrokerService(PulsarService pulsar) throws
Exception {
- return brokerService;
- }
-}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java
index e4331d0b486..d5ea10da0f4 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java
@@ -16,54 +16,31 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.pulsar.broker.namespace;
-import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
-import com.google.common.util.concurrent.MoreExecutors;
-import io.netty.channel.EventLoopGroup;
-import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
-import org.apache.bookkeeper.client.PulsarMockBookKeeper;
-import org.apache.bookkeeper.common.util.OrderedExecutor;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.util.ZkUtils;
-import org.apache.pulsar.broker.BookKeeperClientFactory;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.auth.SameThreadOrderedSafeExecutor;
-import org.apache.pulsar.broker.intercept.CounterBrokerInterceptor;
+import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.policies.data.TopicType;
-import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
-import org.apache.pulsar.metadata.impl.ZKMetadataStore;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.MockZooKeeper;
-import org.apache.zookeeper.MockZooKeeperSession;
-import org.apache.zookeeper.data.ACL;
@Slf4j
-public class OwnerShipForCurrentServerTestBase {
+public abstract class OwnerShipForCurrentServerTestBase {
public static final String CLUSTER_NAME = "test";
@Setter
private int brokerCount = 3;
- private final List<SameThreadOrderedSafeExecutor> orderedExecutorList =
new ArrayList<>();
@Getter
private final List<ServiceConfiguration> serviceConfigurationList = new
ArrayList<>();
@Getter
@@ -72,9 +49,7 @@ public class OwnerShipForCurrentServerTestBase {
protected PulsarAdmin admin;
protected PulsarClient pulsarClient;
- private MockZooKeeper mockZooKeeper;
- private OrderedExecutor bkExecutor;
- private NonClosableMockBookKeeper mockBookKeeper;
+ protected List<PulsarTestContext> pulsarTestContexts = new ArrayList<>();
public void internalSetup() throws Exception {
init();
@@ -85,13 +60,6 @@ public class OwnerShipForCurrentServerTestBase {
}
private void init() throws Exception {
- mockZooKeeper = createMockZooKeeper();
-
- bkExecutor = OrderedExecutor.newBuilder()
- .numThreads(1)
- .name("mock-pulsar-bk")
- .build();
- mockBookKeeper = createMockBookKeeper(bkExecutor);
startBroker();
}
@@ -118,96 +86,22 @@ public class OwnerShipForCurrentServerTestBase {
conf.setWebServicePortTls(Optional.of(0));
serviceConfigurationList.add(conf);
- PulsarService pulsar =
spyWithClassAndConstructorArgs(PulsarService.class, conf);
-
- setupBrokerMocks(pulsar);
- pulsar.start();
+ PulsarTestContext.Builder testContextBuilder =
+ PulsarTestContext.startableBuilder()
+ .config(conf);
+ if (i > 0) {
+
testContextBuilder.reuseMockBookkeeperAndMetadataStores(pulsarTestContexts.get(0));
+ } else {
+ testContextBuilder.withMockZookeeper();
+ }
+ PulsarTestContext pulsarTestContext = testContextBuilder
+ .build();
+ PulsarService pulsar = pulsarTestContext.getPulsarService();
pulsarServiceList.add(pulsar);
+ pulsarTestContexts.add(pulsarTestContext);
}
}
- protected void setupBrokerMocks(PulsarService pulsar) throws Exception {
- // Override default providers with mocked ones
-
doReturn(mockBookKeeperClientFactory).when(pulsar).newBookKeeperClientFactory();
- MockZooKeeperSession mockZooKeeperSession =
MockZooKeeperSession.newInstance(mockZooKeeper);
- doReturn(new
ZKMetadataStore(mockZooKeeperSession)).when(pulsar).createLocalMetadataStore(null);
- doReturn(new
ZKMetadataStore(mockZooKeeperSession)).when(pulsar).createConfigurationMetadataStore(null);
- Supplier<NamespaceService> namespaceServiceSupplier = () ->
spyWithClassAndConstructorArgs(
- NamespaceService.class, pulsar);
-
doReturn(namespaceServiceSupplier).when(pulsar).getNamespaceServiceProvider();
-
- SameThreadOrderedSafeExecutor executor = new
SameThreadOrderedSafeExecutor();
- orderedExecutorList.add(executor);
- doReturn(executor).when(pulsar).getOrderedExecutor();
- doReturn(new
CounterBrokerInterceptor()).when(pulsar).getBrokerInterceptor();
-
- doAnswer((invocation) ->
spy(invocation.callRealMethod())).when(pulsar).newCompactor();
- }
-
- public static MockZooKeeper createMockZooKeeper() throws Exception {
- MockZooKeeper zk =
MockZooKeeper.newInstance(MoreExecutors.newDirectExecutorService());
- List<ACL> dummyAclList = new ArrayList<>(0);
-
- ZkUtils.createFullPathOptimistic(zk, "/ledgers/available/192.168.1.1:"
+ 5000,
- "".getBytes(StandardCharsets.UTF_8), dummyAclList,
CreateMode.PERSISTENT);
-
- zk.create("/ledgers/LAYOUT",
"1\nflat:1".getBytes(StandardCharsets.UTF_8), dummyAclList,
- CreateMode.PERSISTENT);
- return zk;
- }
-
- public static NonClosableMockBookKeeper
createMockBookKeeper(OrderedExecutor executor) throws Exception {
- return spyWithClassAndConstructorArgs(NonClosableMockBookKeeper.class,
executor);
- }
-
- // Prevent the MockBookKeeper instance from being closed when the broker
is restarted within a test
- public static class NonClosableMockBookKeeper extends PulsarMockBookKeeper
{
-
- public NonClosableMockBookKeeper(OrderedExecutor executor) throws
Exception {
- super(executor);
- }
-
- @Override
- public void close() {
- // no-op
- }
-
- @Override
- public void shutdown() {
- // no-op
- }
-
- public void reallyShutdown() {
- super.shutdown();
- }
- }
-
- private final BookKeeperClientFactory mockBookKeeperClientFactory = new
BookKeeperClientFactory() {
-
- @Override
- public BookKeeper create(ServiceConfiguration conf,
MetadataStoreExtended store,
- EventLoopGroup eventLoopGroup,
- Optional<Class<? extends
EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
- Map<String, Object> properties) {
- // Always return the same instance (so that we don't loose the
mock BK content on broker restart
- return mockBookKeeper;
- }
-
- @Override
- public BookKeeper create(ServiceConfiguration conf,
MetadataStoreExtended store,
- EventLoopGroup eventLoopGroup,
- Optional<Class<? extends
EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
- Map<String, Object> properties, StatsLogger
statsLogger) {
- // Always return the same instance (so that we don't loose the
mock BK content on broker restart
- return mockBookKeeper;
- }
-
- @Override
- public void close() {
- // no-op
- }
- };
-
protected final void internalCleanup() {
try {
// if init fails, some of these could be null, and if so would
throw
@@ -220,49 +114,19 @@ public class OwnerShipForCurrentServerTestBase {
pulsarClient.shutdown();
pulsarClient = null;
}
- if (pulsarServiceList.size() > 0) {
- for (PulsarService pulsarService : pulsarServiceList) {
- pulsarService.close();
+ if (pulsarTestContexts.size() > 0) {
+ for(int i = pulsarTestContexts.size() - 1; i >= 0; i--) {
+ pulsarTestContexts.get(i).close();
}
- pulsarServiceList.clear();
+ pulsarTestContexts.clear();
}
+ pulsarServiceList.clear();
if (serviceConfigurationList.size() > 0) {
serviceConfigurationList.clear();
}
- if (mockBookKeeper != null) {
- mockBookKeeper.reallyShutdown();
- }
- if (mockZooKeeper != null) {
- mockZooKeeper.shutdown();
- }
- if (orderedExecutorList.size() > 0) {
- for (int i = 0; i < orderedExecutorList.size(); i++) {
- SameThreadOrderedSafeExecutor
sameThreadOrderedSafeExecutor = orderedExecutorList.get(i);
- if(sameThreadOrderedSafeExecutor != null) {
- try {
- sameThreadOrderedSafeExecutor.shutdownNow();
- sameThreadOrderedSafeExecutor.awaitTermination(5,
TimeUnit.SECONDS);
- } catch (InterruptedException ex) {
- log.error("sameThreadOrderedSafeExecutor shutdown
had error", ex);
- Thread.currentThread().interrupt();
- }
- orderedExecutorList.set(i, null);
- }
- }
- }
- if(bkExecutor != null) {
- try {
- bkExecutor.shutdownNow();
- bkExecutor.awaitTermination(5, TimeUnit.SECONDS);
- } catch (InterruptedException ex) {
- log.error("bkExecutor shutdown had error", ex);
- Thread.currentThread().interrupt();
- }
- bkExecutor = null;
- }
+
} catch (Exception e) {
log.warn("Failed to clean up mocked pulsar service:", e);
}
}
-
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
index 703376b2546..d01fa1fa540 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
@@ -38,7 +38,7 @@ import io.netty.channel.ChannelHandlerContext;
import java.net.InetSocketAddress;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
-import org.apache.pulsar.broker.TestPulsarService;
+import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.MessageId;
@@ -57,14 +57,14 @@ public class MessageCumulativeAckTest {
private ServerCnx serverCnx;
private PersistentSubscription sub;
- private TestPulsarService.Factory testPulsarServiceFactory;
+ private PulsarTestContext pulsarTestContext;
@BeforeMethod
public void setup() throws Exception {
- testPulsarServiceFactory = TestPulsarService.Factory.builder()
+ pulsarTestContext = PulsarTestContext.builder()
.build();
- serverCnx = testPulsarServiceFactory.createServerCnxSpy();
+ serverCnx = pulsarTestContext.createServerCnxSpy();
doReturn(true).when(serverCnx).isActive();
doReturn(true).when(serverCnx).isWritable();
doReturn(new InetSocketAddress("localhost",
1234)).when(serverCnx).clientAddress();
@@ -74,7 +74,7 @@ public class MessageCumulativeAckTest {
.when(serverCnx).getCommandSender();
String topicName =
TopicName.get("MessageCumulativeAckTest").toString();
- PersistentTopic persistentTopic = new PersistentTopic(topicName,
mock(ManagedLedger.class), testPulsarServiceFactory.getBrokerService());
+ PersistentTopic persistentTopic = new PersistentTopic(topicName,
mock(ManagedLedger.class), pulsarTestContext.getBrokerService());
sub = spy(new PersistentSubscription(persistentTopic, "sub-1",
mock(ManagedCursorImpl.class), false));
doNothing().when(sub).acknowledgeMessage(any(), any(), any());
@@ -82,9 +82,9 @@ public class MessageCumulativeAckTest {
@AfterMethod(alwaysRun = true)
public void shutdown() throws Exception {
- if (testPulsarServiceFactory != null) {
- testPulsarServiceFactory.close();
- testPulsarServiceFactory = null;
+ if (pulsarTestContext != null) {
+ pulsarTestContext.close();
+ pulsarTestContext = null;
}
sub = null;
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index 4dfaade33ca..d9a9dbcbcf2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -60,7 +60,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.TestPulsarService;
+import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.broker.namespace.NamespaceService;
import
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
@@ -91,7 +91,7 @@ public class PersistentDispatcherFailoverConsumerTest {
private ChannelHandlerContext channelCtx;
private LinkedBlockingQueue<CommandActiveConsumerChange> consumerChanges;
- protected TestPulsarService.Factory testPulsarServiceFactory;
+ protected PulsarTestContext pulsarTestContext;
final String successTopicName =
"persistent://part-perf/global/perf.t1/ptopic";
final String failTopicName =
"persistent://part-perf/global/perf.t1/pfailTopic";
@@ -104,9 +104,9 @@ public class PersistentDispatcherFailoverConsumerTest {
svcConfig.setClusterName("pulsar-cluster");
svcConfig.setSystemTopicEnabled(false);
svcConfig.setTopicLevelPoliciesEnabled(false);
- testPulsarServiceFactory = TestPulsarService.Factory.builder()
+ pulsarTestContext = PulsarTestContext.builder()
.config(svcConfig)
- .useSpies(true)
+ .spyByDefault()
.build();
consumerChanges = new LinkedBlockingQueue<>();
@@ -132,7 +132,7 @@ public class PersistentDispatcherFailoverConsumerTest {
return null;
}).when(channelCtx).writeAndFlush(any(), any());
- serverCnx = testPulsarServiceFactory.createServerCnxSpy();
+ serverCnx = pulsarTestContext.createServerCnxSpy();
doReturn(true).when(serverCnx).isActive();
doReturn(true).when(serverCnx).isWritable();
doReturn(new InetSocketAddress("localhost",
1234)).when(serverCnx).clientAddress();
@@ -141,7 +141,7 @@ public class PersistentDispatcherFailoverConsumerTest {
doReturn(new PulsarCommandSenderImpl(null, serverCnx))
.when(serverCnx).getCommandSender();
- serverCnxWithOldVersion =
testPulsarServiceFactory.createServerCnxSpy();
+ serverCnxWithOldVersion = pulsarTestContext.createServerCnxSpy();
doReturn(true).when(serverCnxWithOldVersion).isActive();
doReturn(true).when(serverCnxWithOldVersion).isWritable();
doReturn(new InetSocketAddress("localhost", 1234))
@@ -153,7 +153,7 @@ public class PersistentDispatcherFailoverConsumerTest {
.when(serverCnxWithOldVersion).getCommandSender();
NamespaceService nsSvc = mock(NamespaceService.class);
-
doReturn(nsSvc).when(testPulsarServiceFactory.getPulsarService()).getNamespaceService();
+
doReturn(nsSvc).when(pulsarTestContext.getPulsarService()).getNamespaceService();
doReturn(true).when(nsSvc).isServiceUnitOwned(any(NamespaceBundle.class));
doReturn(true).when(nsSvc).isServiceUnitActive(any(TopicName.class));
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkTopicOwnership(any(TopicName.class));
@@ -164,9 +164,9 @@ public class PersistentDispatcherFailoverConsumerTest {
@AfterMethod(alwaysRun = true)
public void shutdown() throws Exception {
- if (testPulsarServiceFactory != null) {
- testPulsarServiceFactory.close();
- testPulsarServiceFactory = null;
+ if (pulsarTestContext != null) {
+ pulsarTestContext.close();
+ pulsarTestContext = null;
}
}
@@ -181,7 +181,7 @@ public class PersistentDispatcherFailoverConsumerTest {
doAnswer(invocationOnMock -> {
((OpenLedgerCallback)
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
return null;
- }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+ }).when(pulsarTestContext.getManagedLedgerFactory())
.asyncOpen(matches(".*success.*"),
any(ManagedLedgerConfig.class),
any(OpenLedgerCallback.class), any(Supplier.class),
any());
@@ -190,7 +190,7 @@ public class PersistentDispatcherFailoverConsumerTest {
((OpenLedgerCallback) invocationOnMock.getArguments()[2])
.openLedgerFailed(new ManagedLedgerException("Managed
ledger failure"), null);
return null;
- }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+ }).when(pulsarTestContext.getManagedLedgerFactory())
.asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class),
any(OpenLedgerCallback.class), any(Supplier.class),
any());
@@ -231,7 +231,7 @@ public class PersistentDispatcherFailoverConsumerTest {
@Test
public void testConsumerGroupChangesWithOldNewConsumers() throws Exception
{
PersistentTopic topic =
- new PersistentTopic(successTopicName, ledgerMock,
testPulsarServiceFactory.getBrokerService());
+ new PersistentTopic(successTopicName, ledgerMock,
pulsarTestContext.getBrokerService());
PersistentSubscription sub = new PersistentSubscription(topic,
"sub-1", cursorMock, false);
int partitionIndex = 0;
@@ -272,7 +272,7 @@ public class PersistentDispatcherFailoverConsumerTest {
public void testAddRemoveConsumer() throws Exception {
log.info("--- Starting
PersistentDispatcherFailoverConsumerTest::testAddConsumer ---");
- PersistentTopic topic = new PersistentTopic(successTopicName,
ledgerMock, testPulsarServiceFactory.getBrokerService());
+ PersistentTopic topic = new PersistentTopic(successTopicName,
ledgerMock, pulsarTestContext.getBrokerService());
PersistentSubscription sub = new PersistentSubscription(topic,
"sub-1", cursorMock, false);
int partitionIndex = 4;
@@ -404,7 +404,7 @@ public class PersistentDispatcherFailoverConsumerTest {
log.info("--- Starting
PersistentDispatcherFailoverConsumerTest::testAddConsumer ---");
PersistentTopic topic =
- new PersistentTopic(successTopicName, ledgerMock,
testPulsarServiceFactory.getBrokerService());
+ new PersistentTopic(successTopicName, ledgerMock,
pulsarTestContext.getBrokerService());
PersistentSubscription sub = new PersistentSubscription(topic,
"sub-1", cursorMock, false);
// Non partitioned topic.
@@ -465,7 +465,7 @@ public class PersistentDispatcherFailoverConsumerTest {
public void
testMultipleDispatcherGetNextConsumerWithDifferentPriorityLevel() throws
Exception {
PersistentTopic topic =
- new PersistentTopic(successTopicName, ledgerMock,
testPulsarServiceFactory.getBrokerService());
+ new PersistentTopic(successTopicName, ledgerMock,
pulsarTestContext.getBrokerService());
PersistentDispatcherMultipleConsumers dispatcher = new
PersistentDispatcherMultipleConsumers(topic, cursorMock, null);
Consumer consumer1 = createConsumer(topic, 0, 2, false, 1);
Consumer consumer2 = createConsumer(topic, 0, 2, false, 2);
@@ -510,7 +510,7 @@ public class PersistentDispatcherFailoverConsumerTest {
@Test
public void testFewBlockedConsumerSamePriority() throws Exception{
PersistentTopic topic =
- new PersistentTopic(successTopicName, ledgerMock,
testPulsarServiceFactory.getBrokerService());
+ new PersistentTopic(successTopicName, ledgerMock,
pulsarTestContext.getBrokerService());
PersistentDispatcherMultipleConsumers dispatcher = new
PersistentDispatcherMultipleConsumers(topic, cursorMock, null);
Consumer consumer1 = createConsumer(topic, 0, 2, false, 1);
Consumer consumer2 = createConsumer(topic, 0, 2, false, 2);
@@ -538,7 +538,7 @@ public class PersistentDispatcherFailoverConsumerTest {
@Test
public void testFewBlockedConsumerDifferentPriority() throws Exception {
PersistentTopic topic =
- new PersistentTopic(successTopicName, ledgerMock,
testPulsarServiceFactory.getBrokerService());
+ new PersistentTopic(successTopicName, ledgerMock,
pulsarTestContext.getBrokerService());
PersistentDispatcherMultipleConsumers dispatcher = new
PersistentDispatcherMultipleConsumers(topic, cursorMock, null);
Consumer consumer1 = createConsumer(topic, 0, 2, false, 1);
Consumer consumer2 = createConsumer(topic, 0, 2, false, 2);
@@ -593,7 +593,7 @@ public class PersistentDispatcherFailoverConsumerTest {
@Test
public void testFewBlockedConsumerDifferentPriority2() throws Exception {
PersistentTopic topic =
- new PersistentTopic(successTopicName, ledgerMock,
testPulsarServiceFactory.getBrokerService());
+ new PersistentTopic(successTopicName, ledgerMock,
pulsarTestContext.getBrokerService());
PersistentDispatcherMultipleConsumers dispatcher = new
PersistentDispatcherMultipleConsumers(topic, cursorMock, null);
Consumer consumer1 = createConsumer(topic, 0, 2, true, 1);
Consumer consumer2 = createConsumer(topic, 0, 2, true, 2);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index ae9fb9a1bf7..0bf60679663 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.pulsar.broker.service;
import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgsRecordingInvocations;
@@ -91,7 +92,6 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.TestPulsarService;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.persistent.CompactorSubscription;
import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator;
@@ -100,10 +100,12 @@ import
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleAct
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.impl.ProducerBuilderImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
@@ -155,7 +157,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
final String successSubName2 = "successSub2";
private static final Logger log =
LoggerFactory.getLogger(PersistentTopicTest.class);
- protected TestPulsarService.Factory testPulsarServiceFactory;
+ protected PulsarTestContext pulsarTestContext;
private BrokerService brokerService;
@@ -170,24 +172,24 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
svcConfig.setClusterName("pulsar-cluster");
svcConfig.setTopicLevelPoliciesEnabled(false);
svcConfig.setSystemTopicEnabled(false);
- testPulsarServiceFactory = TestPulsarService.Factory.builder()
+ pulsarTestContext = PulsarTestContext.builder()
.config(svcConfig)
- .useSpies(true)
+ .spyByDefault()
.useTestPulsarResources(metadataStore)
.compactor(mock(Compactor.class))
.build();
- brokerService = testPulsarServiceFactory.getBrokerService();
+ brokerService = pulsarTestContext.getBrokerService();
doAnswer(invocationOnMock -> CompletableFuture.completedFuture(null))
-
.when(testPulsarServiceFactory.getManagedLedgerFactory()).getManagedLedgerPropertiesAsync(any());
+
.when(pulsarTestContext.getManagedLedgerFactory()).getManagedLedgerPropertiesAsync(any());
doAnswer(invocation -> {
DeleteLedgerCallback deleteLedgerCallback =
invocation.getArgument(1);
deleteLedgerCallback.deleteLedgerComplete(null);
return null;
-
}).when(testPulsarServiceFactory.getManagedLedgerFactory()).asyncDelete(any(),
any(), any());
+
}).when(pulsarTestContext.getManagedLedgerFactory()).asyncDelete(any(), any(),
any());
// Mock serviceCnx.
serverCnx =
spyWithClassAndConstructorArgsRecordingInvocations(ServerCnx.class,
- testPulsarServiceFactory.getPulsarService());
+ pulsarTestContext.getPulsarService());
doReturn(true).when(serverCnx).isActive();
doReturn(true).when(serverCnx).isWritable();
doReturn(new InetSocketAddress("localhost",
1234)).when(serverCnx).clientAddress();
@@ -202,7 +204,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
NamespaceService nsSvc = mock(NamespaceService.class);
NamespaceBundle bundle = mock(NamespaceBundle.class);
doReturn(CompletableFuture.completedFuture(bundle)).when(nsSvc).getBundleAsync(any());
-
doReturn(nsSvc).when(testPulsarServiceFactory.getPulsarService()).getNamespaceService();
+
doReturn(nsSvc).when(pulsarTestContext.getPulsarService()).getNamespaceService();
doReturn(true).when(nsSvc).isServiceUnitOwned(any());
doReturn(true).when(nsSvc).isServiceUnitActive(any());
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).isServiceUnitActiveAsync(any());
@@ -213,9 +215,9 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
@AfterMethod(alwaysRun = true)
public void teardown() throws Exception {
- if (testPulsarServiceFactory != null) {
- testPulsarServiceFactory.close();
- testPulsarServiceFactory = null;
+ if (pulsarTestContext != null) {
+ pulsarTestContext.close();
+ pulsarTestContext = null;
}
}
@@ -228,7 +230,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
doAnswer(invocationOnMock -> {
((OpenLedgerCallback)
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
return null;
- }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+ }).when(pulsarTestContext.getManagedLedgerFactory())
.asyncOpen(anyString(), any(ManagedLedgerConfig.class),
any(OpenLedgerCallback.class),
any(Supplier.class), any());
@@ -254,7 +256,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
.openLedgerFailed(new ManagedLedgerException("Managed
ledger failure"), null)).start();
return null;
- }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+ }).when(pulsarTestContext.getManagedLedgerFactory())
.asyncOpen(anyString(), any(ManagedLedgerConfig.class),
any(OpenLedgerCallback.class),
any(Supplier.class), any());
@@ -315,19 +317,24 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
@Test
public void testDispatcherMultiConsumerReadFailed() {
- PersistentTopic topic =
spyWithClassAndConstructorArgsRecordingInvocations(PersistentTopic.class,
successTopicName, ledgerMock, brokerService);
+ PersistentTopic topic =
+
spyWithClassAndConstructorArgsRecordingInvocations(PersistentTopic.class,
successTopicName, ledgerMock,
+ brokerService);
ManagedCursor cursor = mock(ManagedCursor.class);
when(cursor.getName()).thenReturn("cursor");
Subscription subscription = mock(Subscription.class);
when(subscription.getName()).thenReturn("sub");
- PersistentDispatcherMultipleConsumers dispatcher = new
PersistentDispatcherMultipleConsumers(topic, cursor, subscription);
+ PersistentDispatcherMultipleConsumers dispatcher =
+ new PersistentDispatcherMultipleConsumers(topic, cursor,
subscription);
dispatcher.readEntriesFailed(new
ManagedLedgerException.InvalidCursorPositionException("failed"), null);
verify(topic, atLeast(1)).getBrokerService();
}
@Test
public void testDispatcherSingleConsumerReadFailed() {
- PersistentTopic topic =
spyWithClassAndConstructorArgsRecordingInvocations(PersistentTopic.class,
successTopicName, ledgerMock, brokerService);
+ PersistentTopic topic =
+
spyWithClassAndConstructorArgsRecordingInvocations(PersistentTopic.class,
successTopicName, ledgerMock,
+ brokerService);
ManagedCursor cursor = mock(ManagedCursor.class);
when(cursor.getName()).thenReturn("cursor");
PersistentDispatcherSingleActiveConsumer dispatcher = new
PersistentDispatcherSingleActiveConsumer(cursor,
@@ -517,7 +524,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
@Test
public void testMaxProducersForBroker() {
// set max clients
- testPulsarServiceFactory.getConfig().setMaxProducersPerTopic(2);
+ pulsarTestContext.getConfig().setMaxProducersPerTopic(2);
testMaxProducers();
}
@@ -526,9 +533,9 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
// set max clients
Policies policies = new Policies();
policies.max_producers_per_topic = 2;
- testPulsarServiceFactory.getPulsarResources().getNamespaceResources()
-
.createPolicies(TopicName.get(successTopicName).getNamespaceObject(),
- policies);
+ pulsarTestContext.getPulsarResources().getNamespaceResources()
+
.createPolicies(TopicName.get(successTopicName).getNamespaceObject(),
+ policies);
testMaxProducers();
}
@@ -536,7 +543,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
final String producerNameBase = "producer";
final String role = "appid1";
- ServerCnx cnx = testPulsarServiceFactory.createServerCnxSpy();
+ ServerCnx cnx = pulsarTestContext.createServerCnxSpy();
doReturn(true).when(cnx).isActive();
doReturn(true).when(cnx).isWritable();
doReturn(new InetSocketAddress(address,
1234)).when(cnx).clientAddress();
@@ -550,7 +557,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
@Test
public void testMaxSameAddressProducers() throws Exception {
// set max clients
-
testPulsarServiceFactory.getConfig().setMaxSameAddressProducersPerTopic(2);
+ pulsarTestContext.getConfig().setMaxSameAddressProducersPerTopic(2);
PersistentTopic topic = new PersistentTopic(successTopicName,
ledgerMock, brokerService);
@@ -694,7 +701,8 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
Consumer consumer = new Consumer(sub, SubType.Exclusive,
topic.getName(), 1, 0, "Cons1", true, serverCnx,
"myrole-1", Collections.emptyMap(), false,
- new
KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT), MessageId.latest,
DEFAULT_CONSUMER_EPOCH);
+ new
KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT), MessageId.latest,
+ DEFAULT_CONSUMER_EPOCH);
sub.addConsumer(consumer);
consumer.close();
@@ -705,7 +713,8 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
consumer = new Consumer(sub, subType, topic.getName(), 1, 0,
"Cons1", true, serverCnx, "myrole-1",
Collections.emptyMap(), false,
- new
KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT), MessageId.latest,
DEFAULT_CONSUMER_EPOCH);
+ new
KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT), MessageId.latest,
+ DEFAULT_CONSUMER_EPOCH);
sub.addConsumer(consumer);
assertTrue(sub.getDispatcher().isConsumerConnected());
@@ -727,8 +736,10 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
PersistentSubscription sub = new PersistentSubscription(topic,
"sub-1", cursorMock, false);
// 1. simple add consumer
- Consumer consumer = new Consumer(sub, SubType.Exclusive,
topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */,
- true, serverCnx, "myrole-1", Collections.emptyMap(), false /*
read compacted */, null, MessageId.latest, DEFAULT_CONSUMER_EPOCH);
+ Consumer consumer = new Consumer(sub, SubType.Exclusive,
topic.getName(), 1 /* consumer id */, 0,
+ "Cons1"/* consumer name */,
+ true, serverCnx, "myrole-1", Collections.emptyMap(), false /*
read compacted */, null, MessageId.latest,
+ DEFAULT_CONSUMER_EPOCH);
sub.addConsumer(consumer);
assertTrue(sub.getDispatcher().isConsumerConnected());
@@ -853,8 +864,8 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
@Test
public void testMaxConsumersSharedForBroker() throws Exception {
// set max clients
- testPulsarServiceFactory.getConfig().setMaxConsumersPerSubscription(2);
- testPulsarServiceFactory.getConfig().setMaxConsumersPerTopic(3);
+ pulsarTestContext.getConfig().setMaxConsumersPerSubscription(2);
+ pulsarTestContext.getConfig().setMaxConsumersPerTopic(3);
testMaxConsumersShared();
}
@@ -865,7 +876,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
policies.max_consumers_per_subscription = 2;
policies.max_consumers_per_topic = 3;
- testPulsarServiceFactory.getPulsarResources().getNamespaceResources()
+ pulsarTestContext.getPulsarResources().getNamespaceResources()
.createPolicies(TopicName.get(successTopicName).getNamespaceObject(),
policies);
@@ -949,8 +960,8 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
@Test
public void testMaxConsumersFailoverForBroker() throws Exception {
// set max clients
- testPulsarServiceFactory.getConfig().setMaxConsumersPerSubscription(2);
- testPulsarServiceFactory.getConfig().setMaxConsumersPerTopic(3);
+ pulsarTestContext.getConfig().setMaxConsumersPerSubscription(2);
+ pulsarTestContext.getConfig().setMaxConsumersPerTopic(3);
testMaxConsumersFailover();
}
@@ -962,7 +973,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
policies.max_consumers_per_subscription = 2;
policies.max_consumers_per_topic = 3;
- testPulsarServiceFactory.getPulsarResources().getNamespaceResources()
+ pulsarTestContext.getPulsarResources().getNamespaceResources()
.createPolicies(TopicName.get(successTopicName).getNamespaceObject(),
policies);
@@ -974,7 +985,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
final String consumerNameBase = "consumer";
final String role = "appid1";
- ServerCnx cnx = testPulsarServiceFactory.createServerCnxSpy();
+ ServerCnx cnx = pulsarTestContext.createServerCnxSpy();
doReturn(true).when(cnx).isActive();
doReturn(true).when(cnx).isWritable();
doReturn(new InetSocketAddress(address,
1234)).when(cnx).clientAddress();
@@ -988,7 +999,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
@Test
public void testMaxSameAddressConsumers() throws Exception {
// set max clients
-
testPulsarServiceFactory.getConfig().setMaxSameAddressConsumersPerTopic(2);
+ pulsarTestContext.getConfig().setMaxSameAddressConsumersPerTopic(2);
PersistentTopic topic = new PersistentTopic(successTopicName,
ledgerMock, brokerService);
PersistentSubscription sub1 = new PersistentSubscription(topic,
"sub1", cursorMock, false);
@@ -1091,8 +1102,10 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
public void testUbsubscribeRaceConditions() throws Exception {
PersistentTopic topic = new PersistentTopic(successTopicName,
ledgerMock, brokerService);
PersistentSubscription sub = new PersistentSubscription(topic,
"sub-1", cursorMock, false);
- Consumer consumer1 = new Consumer(sub, SubType.Exclusive,
topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */,
- true, serverCnx, "myrole-1", Collections.emptyMap(), false /*
read compacted */, null, MessageId.latest, DEFAULT_CONSUMER_EPOCH);
+ Consumer consumer1 = new Consumer(sub, SubType.Exclusive,
topic.getName(), 1 /* consumer id */, 0,
+ "Cons1"/* consumer name */,
+ true, serverCnx, "myrole-1", Collections.emptyMap(), false /*
read compacted */, null, MessageId.latest,
+ DEFAULT_CONSUMER_EPOCH);
sub.addConsumer(consumer1);
doAnswer(invocationOnMock -> {
@@ -1113,7 +1126,8 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
Thread.sleep(10); /* delay to ensure that the ubsubscribe gets
executed first */
sub.addConsumer(new Consumer(sub, SubType.Exclusive,
topic.getName(), 2 /* consumer id */,
0, "Cons2"/* consumer name */, true, serverCnx,
- "myrole-1", Collections.emptyMap(), false /* read
compacted */, null, MessageId.latest, DEFAULT_CONSUMER_EPOCH)).get();
+ "myrole-1", Collections.emptyMap(), false /* read
compacted */, null, MessageId.latest,
+ DEFAULT_CONSUMER_EPOCH)).get();
fail();
} catch (Exception e) {
assertTrue(e.getCause() instanceof
BrokerServiceException.SubscriptionFencedException);
@@ -1350,7 +1364,8 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
String role = "appid1";
Thread.sleep(10); /* delay to ensure that the delete gets executed
first */
Producer producer = new Producer(topic, serverCnx, 1 /* producer
id */, "prod-name",
- role, false, null, SchemaVersion.Latest, 0, false,
ProducerAccessMode.Shared, Optional.empty(), true);
+ role, false, null, SchemaVersion.Latest, 0, false,
ProducerAccessMode.Shared, Optional.empty(),
+ true);
topic.addProducer(producer, new CompletableFuture<>()).join();
fail("Should have failed");
} catch (Exception e) {
@@ -1412,7 +1427,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
doAnswer(invocationOnMock -> {
((OpenLedgerCallback)
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
return null;
- }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+ }).when(pulsarTestContext.getManagedLedgerFactory())
.asyncOpen(matches(".*success.*"),
any(ManagedLedgerConfig.class),
any(OpenLedgerCallback.class), any(Supplier.class),
any());
@@ -1421,7 +1436,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
((OpenLedgerCallback) invocationOnMock.getArguments()[2])
.openLedgerFailed(new ManagedLedgerException("Managed
ledger failure"), null);
return null;
- }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+ }).when(pulsarTestContext.getManagedLedgerFactory())
.asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class),
any(OpenLedgerCallback.class), any(Supplier.class),
any());
@@ -1437,7 +1452,9 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
doAnswer(invocationOnMock -> {
((OpenCursorCallback)
invocationOnMock.getArguments()[2]).openCursorComplete(cursorMock, null);
return null;
- }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"),
any(InitialPosition.class), any(OpenCursorCallback.class), any());
+ }).when(ledgerMock)
+ .asyncOpenCursor(matches(".*success.*"),
any(InitialPosition.class), any(OpenCursorCallback.class),
+ any());
doAnswer(invocationOnMock -> {
((OpenCursorCallback)
invocationOnMock.getArguments()[4]).openCursorComplete(cursorMock, null);
@@ -1462,10 +1479,10 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
}).when(ledgerMock).asyncDeleteCursor(matches(".*success.*"),
any(DeleteCursorCallback.class), any());
doAnswer((invokactionOnMock) -> {
- ((MarkDeleteCallback) invokactionOnMock.getArguments()[2])
+ ((MarkDeleteCallback) invokactionOnMock.getArguments()[2])
.markDeleteComplete(invokactionOnMock.getArguments()[3]);
- return null;
- }).when(cursorMock).asyncMarkDelete(any(), any(),
any(MarkDeleteCallback.class), any());
+ return null;
+ }).when(cursorMock).asyncMarkDelete(any(), any(),
any(MarkDeleteCallback.class), any());
}
@@ -1683,20 +1700,26 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
String remoteReplicatorName = topic.getReplicatorPrefix() + "." +
remoteCluster;
ConcurrentOpenHashMap<String, Replicator> replicatorMap =
topic.getReplicators();
- PulsarService pulsar = testPulsarServiceFactory.getPulsarService();
- final URL brokerUrl = new URL(
- "http://" + pulsar.getAdvertisedAddress() + ":" +
pulsar.getConfiguration().getBrokerServicePort()
- .get());
- @Cleanup
- PulsarClient client =
PulsarClient.builder().serviceUrl(brokerUrl.toString()).build();
ManagedCursor cursor = mock(ManagedCursorImpl.class);
doReturn(remoteCluster).when(cursor).getName();
- brokerService.getReplicationClients().put(remoteCluster, client);
+ PulsarClientImpl pulsarClientMock = mock(PulsarClientImpl.class);
+ when(pulsarClientMock.newProducer(any())).thenAnswer(
+ invocation -> {
+ ProducerBuilderImpl producerBuilder =
+ new ProducerBuilderImpl(pulsarClientMock,
invocation.getArgument(0)) {
+ @Override
+ public
CompletableFuture<org.apache.pulsar.client.api.Producer> createAsync() {
+ return
CompletableFuture.completedFuture(mock(org.apache.pulsar.client.api.Producer.class));
+ }
+ };
+ return producerBuilder;
+ });
+ brokerService.getReplicationClients().put(remoteCluster,
pulsarClientMock);
PersistentReplicator replicator = spy(
new GeoPersistentReplicator(topic, cursor, localCluster,
remoteCluster, brokerService,
(PulsarClientImpl)
brokerService.getReplicationClient(remoteCluster,
brokerService.pulsar().getPulsarResources().getClusterResources()
- .getCluster(remoteCluster))));
+ .getCluster(remoteCluster))));
replicatorMap.put(remoteReplicatorName, replicator);
// step-1 remove replicator : it will disconnect the producer but it
will wait for callback to be completed
@@ -1730,7 +1753,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
PersistentTopic topic = new PersistentTopic(globalTopicName,
ledgerMock, brokerService);
- PulsarService pulsar = testPulsarServiceFactory.getPulsarService();
+ PulsarService pulsar = pulsarTestContext.getPulsarService();
final URL brokerUrl = new URL(
"http://" + pulsar.getAdvertisedAddress() + ":" +
pulsar.getConfiguration().getBrokerServicePort()
.get());
@@ -1738,7 +1761,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
PulsarClient client =
spy(PulsarClient.builder().serviceUrl(brokerUrl.toString()).build());
PulsarClientImpl clientImpl = (PulsarClientImpl) client;
doReturn(new CompletableFuture<Producer>()).when(clientImpl)
- .createProducerAsync(any(ProducerConfigurationData.class),
any(Schema.class));
+ .createProducerAsync(any(ProducerConfigurationData.class),
any(Schema.class));
ManagedCursor cursor = mock(ManagedCursorImpl.class);
doReturn(remoteCluster).when(cursor).getName();
@@ -1750,10 +1773,10 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
// PersistentReplicator constructor calls startProducer()
verify(clientImpl)
- .createProducerAsync(
- any(ProducerConfigurationData.class),
- any(), eq(null)
- );
+ .createProducerAsync(
+ any(ProducerConfigurationData.class),
+ any(), eq(null)
+ );
replicator.disconnect(false);
replicator.disconnect(false);
@@ -1770,8 +1793,8 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
when(compactedTopic.newCompactedLedger(any(Position.class), anyLong()))
.thenReturn(CompletableFuture.completedFuture(mock(CompactedTopicContext.class)));
PersistentSubscription sub = new CompactorSubscription(topic,
compactedTopic,
-
Compactor.COMPACTION_SUBSCRIPTION,
- cursorMock);
+ Compactor.COMPACTION_SUBSCRIPTION,
+ cursorMock);
PositionImpl position = new PositionImpl(1, 1);
long ledgerId = 0xc0bfefeL;
sub.acknowledgeMessage(Collections.singletonList(position),
AckType.Cumulative,
@@ -1800,13 +1823,13 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
@Test
public void testCompactionTriggeredAfterThresholdFirstInvocation() throws
Exception {
CompletableFuture<Long> compactPromise = new CompletableFuture<>();
- Compactor compactor =
testPulsarServiceFactory.getPulsarService().getCompactor();
+ Compactor compactor =
pulsarTestContext.getPulsarService().getCompactor();
doReturn(compactPromise).when(compactor).compact(anyString());
Policies policies = new Policies();
policies.compaction_threshold = 1L;
- testPulsarServiceFactory.getPulsarResources().getNamespaceResources()
+ pulsarTestContext.getPulsarResources().getNamespaceResources()
.createPolicies(TopicName.get(successTopicName).getNamespaceObject(),
policies);
@@ -1831,7 +1854,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
@Test
public void testCompactionTriggeredAfterThresholdSecondInvocation() throws
Exception {
CompletableFuture<Long> compactPromise = new CompletableFuture<>();
- Compactor compactor =
testPulsarServiceFactory.getPulsarService().getCompactor();
+ Compactor compactor =
pulsarTestContext.getPulsarService().getCompactor();
doReturn(compactPromise).when(compactor).compact(anyString());
ManagedCursor subCursor = mock(ManagedCursor.class);
@@ -1841,7 +1864,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
Policies policies = new Policies();
policies.compaction_threshold = 1L;
- testPulsarServiceFactory.getPulsarResources().getNamespaceResources()
+ pulsarTestContext.getPulsarResources().getNamespaceResources()
.createPolicies(TopicName.get(successTopicName).getNamespaceObject(),
policies);
@@ -1865,13 +1888,13 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
@Test
public void testCompactionDisabledWithZeroThreshold() throws Exception {
CompletableFuture<Long> compactPromise = new CompletableFuture<>();
- Compactor compactor =
testPulsarServiceFactory.getPulsarService().getCompactor();
+ Compactor compactor =
pulsarTestContext.getPulsarService().getCompactor();
doReturn(compactPromise).when(compactor).compact(anyString());
Policies policies = new Policies();
policies.compaction_threshold = 0L;
- testPulsarServiceFactory.getPulsarResources().getNamespaceResources()
+ pulsarTestContext.getPulsarResources().getNamespaceResources()
.createPolicies(TopicName.get(successTopicName).getNamespaceObject(),
policies);
@@ -1886,7 +1909,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
@Test
public void testBacklogCursor() throws Exception {
int backloggedThreshold = 10;
-
testPulsarServiceFactory.getConfig().setManagedLedgerCursorBackloggedThreshold(backloggedThreshold);
+
pulsarTestContext.getConfig().setManagedLedgerCursorBackloggedThreshold(backloggedThreshold);
ManagedLedgerImpl ledger = (ManagedLedgerImpl)
factory.open("cache_backlog_ledger");
PersistentTopic topic = new PersistentTopic(successTopicName, ledger,
brokerService);
@@ -1895,22 +1918,28 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
// Open cursor1, add it into activeCursor-container and add it into
subscription consumer list
ManagedCursor cursor1 = ledger.openCursor("c1");
PersistentSubscription sub1 = new PersistentSubscription(topic,
"sub-1", cursor1, false);
- Consumer consumer1 = new Consumer(sub1, SubType.Exclusive,
topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */,
- true, serverCnx, "myrole-1", Collections.emptyMap(), false /* read
compacted */, null, MessageId.latest, DEFAULT_CONSUMER_EPOCH);
+ Consumer consumer1 = new Consumer(sub1, SubType.Exclusive,
topic.getName(), 1 /* consumer id */, 0,
+ "Cons1"/* consumer name */,
+ true, serverCnx, "myrole-1", Collections.emptyMap(), false /*
read compacted */, null, MessageId.latest,
+ DEFAULT_CONSUMER_EPOCH);
topic.getSubscriptions().put(Codec.decode(cursor1.getName()), sub1);
sub1.addConsumer(consumer1);
// Open cursor2, add it into activeCursor-container and add it into
subscription consumer list
ManagedCursor cursor2 = ledger.openCursor("c2");
PersistentSubscription sub2 = new PersistentSubscription(topic,
"sub-2", cursor2, false);
- Consumer consumer2 = new Consumer(sub2, SubType.Exclusive,
topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */,
- true, serverCnx, "myrole-2", Collections.emptyMap(), false /* read
compacted */, null, MessageId.latest, DEFAULT_CONSUMER_EPOCH);
+ Consumer consumer2 = new Consumer(sub2, SubType.Exclusive,
topic.getName(), 2 /* consumer id */, 0,
+ "Cons2"/* consumer name */,
+ true, serverCnx, "myrole-2", Collections.emptyMap(), false /*
read compacted */, null, MessageId.latest,
+ DEFAULT_CONSUMER_EPOCH);
topic.getSubscriptions().put(Codec.decode(cursor2.getName()), sub2);
sub2.addConsumer(consumer2);
// Open cursor3, add it into activeCursor-container and do not add it
into subscription consumer list
ManagedCursor cursor3 = ledger.openCursor("c3");
PersistentSubscription sub3 = new PersistentSubscription(topic,
"sub-3", cursor3, false);
- Consumer consumer3 = new Consumer(sub2, SubType.Exclusive,
topic.getName(), 3 /* consumer id */, 0, "Cons2"/* consumer name */,
- true, serverCnx, "myrole-3", Collections.emptyMap(), false /* read
compacted */, null, MessageId.latest, DEFAULT_CONSUMER_EPOCH);
+ Consumer consumer3 = new Consumer(sub2, SubType.Exclusive,
topic.getName(), 3 /* consumer id */, 0,
+ "Cons2"/* consumer name */,
+ true, serverCnx, "myrole-3", Collections.emptyMap(), false /*
read compacted */, null, MessageId.latest,
+ DEFAULT_CONSUMER_EPOCH);
topic.getSubscriptions().put(Codec.decode(cursor3.getName()), sub3);
// Case1: cursors are active as haven't started
deactivateBacklogCursor scan
@@ -2006,13 +2035,19 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
.concurrencyLevel(1)
.build();
// This subscription is connected by consumer.
- PersistentSubscription nonDeletableSubscription1 =
spyWithClassAndConstructorArgsRecordingInvocations(PersistentSubscription.class,
topic, "nonDeletableSubscription1", cursorMock, false);
+ PersistentSubscription nonDeletableSubscription1 =
+
spyWithClassAndConstructorArgsRecordingInvocations(PersistentSubscription.class,
topic,
+ "nonDeletableSubscription1", cursorMock, false);
subscriptions.put(nonDeletableSubscription1.getName(),
nonDeletableSubscription1);
// This subscription is not connected by consumer.
- PersistentSubscription deletableSubscription1 =
spyWithClassAndConstructorArgsRecordingInvocations(PersistentSubscription.class,
topic, "deletableSubscription1", cursorMock, false);
+ PersistentSubscription deletableSubscription1 =
+
spyWithClassAndConstructorArgsRecordingInvocations(PersistentSubscription.class,
topic,
+ "deletableSubscription1", cursorMock, false);
subscriptions.put(deletableSubscription1.getName(),
deletableSubscription1);
// This subscription is replicated.
- PersistentSubscription nonDeletableSubscription2 =
spyWithClassAndConstructorArgsRecordingInvocations(PersistentSubscription.class,
topic, "nonDeletableSubscription2", cursorMock, true);
+ PersistentSubscription nonDeletableSubscription2 =
+
spyWithClassAndConstructorArgsRecordingInvocations(PersistentSubscription.class,
topic,
+ "nonDeletableSubscription2", cursorMock, true);
subscriptions.put(nonDeletableSubscription2.getName(),
nonDeletableSubscription2);
Field field = topic.getClass().getDeclaredField("subscriptions");
@@ -2027,11 +2062,11 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
true, serverCnx, "app1", Collections.emptyMap(), false, null,
MessageId.latest, DEFAULT_CONSUMER_EPOCH);
addConsumerToSubscription.invoke(topic, nonDeletableSubscription1,
consumer);
- testPulsarServiceFactory.getPulsarResources().getNamespaceResources()
+ pulsarTestContext.getPulsarResources().getNamespaceResources()
.createPolicies(TopicName.get(successTopicName).getNamespaceObject(),
new Policies());
-
testPulsarServiceFactory.getConfig().setSubscriptionExpirationTimeMinutes(5);
+ pulsarTestContext.getConfig().setSubscriptionExpirationTimeMinutes(5);
doReturn(System.currentTimeMillis() -
TimeUnit.MINUTES.toMillis(6)).when(cursorMock).getLastActive();
@@ -2058,7 +2093,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
Field isClosingOrDeletingField =
PersistentTopic.class.getDeclaredField("isClosingOrDeleting");
isClosingOrDeletingField.setAccessible(true);
- testPulsarServiceFactory.getConfig().setTopicFencingTimeoutSeconds(10);
+ pulsarTestContext.getConfig().setTopicFencingTimeoutSeconds(10);
fence.invoke(topic);
unfence.invoke(topic);
ScheduledFuture<?> fencedTopicMonitoringTask = (ScheduledFuture<?>)
fencedTopicMonitoringTaskField.get(topic);
@@ -2067,7 +2102,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
assertFalse((boolean) isFencedField.get(topic));
assertFalse((boolean) isClosingOrDeletingField.get(topic));
- testPulsarServiceFactory.getConfig().setTopicFencingTimeoutSeconds(1);
+ pulsarTestContext.getConfig().setTopicFencingTimeoutSeconds(1);
fence.invoke(topic);
Thread.sleep(2000);
fencedTopicMonitoringTask = (ScheduledFuture<?>)
fencedTopicMonitoringTaskField.get(topic);
@@ -2079,7 +2114,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
@Test
public void testTopicCloseFencingTimeout() throws Exception {
- testPulsarServiceFactory.getConfig().setTopicFencingTimeoutSeconds(10);
+ pulsarTestContext.getConfig().setTopicFencingTimeoutSeconds(10);
Method fence = PersistentTopic.class.getDeclaredMethod("fence");
fence.setAccessible(true);
Field fencedTopicMonitoringTaskField =
PersistentTopic.class.getDeclaredField("fencedTopicMonitoringTask");
@@ -2106,7 +2141,8 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
Position mockPosition = mock(Position.class);
doReturn("test").when(mockCursor).getName();
doAnswer((Answer<Object>) invocationOnMock -> {
- ((AsyncCallbacks.FindEntryCallback)
invocationOnMock.getArguments()[2]).findEntryComplete(mockPosition,
invocationOnMock.getArguments()[3]);
+ ((AsyncCallbacks.FindEntryCallback)
invocationOnMock.getArguments()[2]).findEntryComplete(mockPosition,
+ invocationOnMock.getArguments()[3]);
return null;
}).when(mockCursor).asyncFindNewestMatching(any(), any(), any(),
any());
doAnswer((Answer<Object>) invocationOnMock -> {
@@ -2184,7 +2220,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
assertEquals(stats2.keySharedMode, "AUTO_SPLIT");
assertTrue(stats2.allowOutOfOrderDelivery);
- KeySharedMeta ksm = new
KeySharedMeta().setKeySharedMode(KeySharedMode.STICKY)
+ KeySharedMeta ksm = new
KeySharedMeta().setKeySharedMode(KeySharedMode.STICKY)
.setAllowOutOfOrderDelivery(false);
ksm.addHashRange().setStart(0).setEnd(65535);
Consumer consumer3 = new Consumer(sub3, SubType.Key_Shared,
topic.getName(), 3, 0, "Cons3", true, serverCnx,
@@ -2222,7 +2258,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
Set<String> namespaceClusters = new HashSet<>();
namespaceClusters.add("namespace-cluster");
policies.replication_clusters = namespaceClusters;
- testPulsarServiceFactory.getPulsarResources().getNamespaceResources()
+ pulsarTestContext.getPulsarResources().getNamespaceResources()
.createPolicies(TopicName.get(successTopicName).getNamespaceObject(), policies);
topic = new PersistentTopic(successTopicName, ledgerMock,
brokerService);
@@ -2230,7 +2266,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
assertEquals(topic.getHierarchyTopicPolicies().getReplicationClusters().get(),
namespaceClusters);
TopicPoliciesService topicPoliciesService =
mock(TopicPoliciesService.class);
-
doReturn(topicPoliciesService).when(testPulsarServiceFactory.getPulsarService()).getTopicPoliciesService();
+
doReturn(topicPoliciesService).when(pulsarTestContext.getPulsarService()).getTopicPoliciesService();
CompletableFuture<Optional<TopicPolicies>> topicPoliciesFuture = new
CompletableFuture<>();
TopicPolicies topicPolicies = new TopicPolicies();
List<String> topicClusters = new ArrayList<>();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxAuthorizationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxAuthorizationTest.java
index 325cb8d84e4..c1e32c0886e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxAuthorizationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxAuthorizationTest.java
@@ -45,7 +45,7 @@ import java.util.Optional;
import java.util.Properties;
import javax.crypto.SecretKey;
import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.TestPulsarService;
+import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription;
import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
@@ -76,7 +76,7 @@ public class ServerCnxAuthorizationTest {
private ServiceConfiguration svcConfig;
- protected TestPulsarService.Factory testPulsarServiceFactory;
+ protected PulsarTestContext pulsarTestContext;
private BrokerService brokerService;
@BeforeMethod(alwaysRun = true)
@@ -95,21 +95,21 @@ public class ServerCnxAuthorizationTest {
properties.setProperty("tokenSecretKey", "data:;base64,"
+ Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded()));
svcConfig.setProperties(properties);
- testPulsarServiceFactory = TestPulsarService.Factory.builder()
+ pulsarTestContext = PulsarTestContext.builder()
.config(svcConfig)
- .useSpies(true)
+ .spyByDefault()
.build();
- brokerService = testPulsarServiceFactory.getBrokerService();
+ brokerService = pulsarTestContext.getBrokerService();
-
testPulsarServiceFactory.getPulsarResources().getTenantResources().createTenant("public",
+
pulsarTestContext.getPulsarResources().getTenantResources().createTenant("public",
TenantInfo.builder().build());
}
@AfterMethod(alwaysRun = true)
public void cleanup() throws Exception {
- if (testPulsarServiceFactory != null) {
- testPulsarServiceFactory.close();
- testPulsarServiceFactory = null;
+ if (pulsarTestContext != null) {
+ pulsarTestContext.close();
+ pulsarTestContext = null;
}
}
@@ -118,7 +118,7 @@ public class ServerCnxAuthorizationTest {
svcConfig.setAuthenticateOriginalAuthData(true);
- ServerCnx serverCnx = testPulsarServiceFactory.createServerCnxSpy();
+ ServerCnx serverCnx = pulsarTestContext.createServerCnxSpy();
ChannelHandlerContext channelHandlerContext =
mock(ChannelHandlerContext.class);
Channel channel = mock(Channel.class);
ChannelPipeline channelPipeline = mock(ChannelPipeline.class);
@@ -155,7 +155,7 @@ public class ServerCnxAuthorizationTest {
AuthorizationService authorizationService =
spyWithClassAndConstructorArgsRecordingInvocations(AuthorizationService.class,
svcConfig,
- testPulsarServiceFactory.getPulsarResources());
+ pulsarTestContext.getPulsarResources());
doReturn(authorizationService).when(brokerService).getAuthorizationService();
// lookup
@@ -225,7 +225,7 @@ public class ServerCnxAuthorizationTest {
public void testVerifyOriginalPrincipalWithoutAuthDataForwardedFromProxy()
throws Exception {
svcConfig.setAuthenticateOriginalAuthData(false);
- ServerCnx serverCnx = testPulsarServiceFactory.createServerCnxSpy();
+ ServerCnx serverCnx = pulsarTestContext.createServerCnxSpy();
ChannelHandlerContext channelHandlerContext =
mock(ChannelHandlerContext.class);
Channel channel = mock(Channel.class);
ChannelPipeline channelPipeline = mock(ChannelPipeline.class);
@@ -257,7 +257,7 @@ public class ServerCnxAuthorizationTest {
AuthorizationService authorizationService =
spyWithClassAndConstructorArgsRecordingInvocations(AuthorizationService.class,
svcConfig,
- testPulsarServiceFactory.getPulsarResources());
+ pulsarTestContext.getPulsarResources());
doReturn(authorizationService).when(brokerService).getAuthorizationService();
// lookup
@@ -318,7 +318,7 @@ public class ServerCnxAuthorizationTest {
@Test
public void testVerifyAuthRoleAndAuthDataFromDirectConnectionBroker()
throws Exception {
- ServerCnx serverCnx = testPulsarServiceFactory.createServerCnxSpy();
+ ServerCnx serverCnx = pulsarTestContext.createServerCnxSpy();
ChannelHandlerContext channelHandlerContext =
mock(ChannelHandlerContext.class);
Channel channel = mock(Channel.class);
@@ -350,7 +350,7 @@ public class ServerCnxAuthorizationTest {
AuthorizationService authorizationService =
spyWithClassAndConstructorArgsRecordingInvocations(AuthorizationService.class,
svcConfig,
- testPulsarServiceFactory.getPulsarResources());
+ pulsarTestContext.getPulsarResources());
doReturn(authorizationService).when(brokerService).getAuthorizationService();
// lookup
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index c4088d2e124..b990ced12b1 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -71,7 +71,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.TestPulsarService;
+import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.broker.TransactionMetadataStoreService;
import org.apache.pulsar.broker.auth.MockAuthenticationProvider;
import org.apache.pulsar.broker.auth.MockMultiStageAuthenticationProvider;
@@ -143,7 +143,7 @@ public class ServerCnxTest {
private ServiceConfiguration svcConfig;
private ServerCnx serverCnx;
- protected TestPulsarService.Factory testPulsarServiceFactory;
+ protected PulsarTestContext pulsarTestContext;
protected PulsarService pulsar;
protected BrokerService brokerService;
@@ -181,13 +181,13 @@ public class ServerCnxTest {
svcConfig.setKeepAliveIntervalSeconds(inSec(1, TimeUnit.SECONDS));
svcConfig.setBacklogQuotaCheckEnabled(false);
svcConfig.setClusterName("use");
- testPulsarServiceFactory = TestPulsarService.Factory.builder()
+ pulsarTestContext = PulsarTestContext.builder()
.config(svcConfig)
- .useSpies(true)
+ .spyByDefault()
.build();
- pulsar = testPulsarServiceFactory.getPulsarService();
+ pulsar = pulsarTestContext.getPulsarService();
- brokerService = testPulsarServiceFactory.getBrokerService();
+ brokerService = pulsarTestContext.getBrokerService();
namespaceService = pulsar.getNamespaceService();
doReturn(CompletableFuture.completedFuture(null)).when(namespaceService).getBundleAsync(any());
@@ -217,9 +217,9 @@ public class ServerCnxTest {
if (channel != null) {
channel.close();
}
- if (testPulsarServiceFactory != null) {
- testPulsarServiceFactory.close();
- testPulsarServiceFactory = null;
+ if (pulsarTestContext != null) {
+ pulsarTestContext.close();
+ pulsarTestContext = null;
}
}
@@ -895,7 +895,7 @@ public class ServerCnxTest {
() -> ((OpenLedgerCallback)
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock,
null));
return null;
- }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+ }).when(pulsarTestContext.getManagedLedgerFactory())
.asyncOpen(matches(".*success.*"),
any(ManagedLedgerConfig.class),
any(OpenLedgerCallback.class), any(Supplier.class),
any());
@@ -950,7 +950,7 @@ public class ServerCnxTest {
() -> ((OpenLedgerCallback)
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock,
null));
return null;
- }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+ }).when(pulsarTestContext.getManagedLedgerFactory())
.asyncOpen(matches(".*success.*"),
any(ManagedLedgerConfig.class),
any(OpenLedgerCallback.class), any(Supplier.class),
any());
@@ -1017,7 +1017,7 @@ public class ServerCnxTest {
((OpenLedgerCallback)
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
return null;
- }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+ }).when(pulsarTestContext.getManagedLedgerFactory())
.asyncOpen(matches(".*success.*"),
any(ManagedLedgerConfig.class),
any(OpenLedgerCallback.class), any(Supplier.class),
any());
@@ -1096,7 +1096,7 @@ public class ServerCnxTest {
() -> ((OpenLedgerCallback)
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock,
null));
return null;
- }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+ }).when(pulsarTestContext.getManagedLedgerFactory())
.asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class),
any(OpenLedgerCallback.class), any(Supplier.class),
any());
@@ -1168,7 +1168,7 @@ public class ServerCnxTest {
null));
return null;
- }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+ }).when(pulsarTestContext.getManagedLedgerFactory())
.asyncOpen(matches(".*success.*"),
any(ManagedLedgerConfig.class),
any(OpenLedgerCallback.class), any(Supplier.class),
any());
@@ -1243,7 +1243,7 @@ public class ServerCnxTest {
() -> ((OpenLedgerCallback)
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock,
null));
return null;
- }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+ }).when(pulsarTestContext.getManagedLedgerFactory())
.asyncOpen(matches(".*success.*"),
any(ManagedLedgerConfig.class),
any(OpenLedgerCallback.class), any(Supplier.class),
any());
@@ -1252,7 +1252,7 @@ public class ServerCnxTest {
openTopicFail.complete(() -> ((OpenLedgerCallback)
invocationOnMock.getArguments()[2])
.openLedgerFailed(new ManagedLedgerException("Managed
ledger failure"), null));
return null;
- }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+ }).when(pulsarTestContext.getManagedLedgerFactory())
.asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class),
any(OpenLedgerCallback.class), any(Supplier.class),
any());
@@ -1494,7 +1494,7 @@ public class ServerCnxTest {
// add `clusterDispatchRate` otherwise there will be a NPE
//
`org.apache.pulsar.broker.service.AbstractTopic.updateNamespaceReplicatorDispatchRate`
policies.replicatorDispatchRate = new HashMap<>();
- testPulsarServiceFactory.getPulsarResources().getNamespaceResources()
+ pulsarTestContext.getPulsarResources().getNamespaceResources()
.createPolicies(TopicName.get(encryptionRequiredTopicName).getNamespaceObject(),
policies);
@@ -1532,7 +1532,7 @@ public class ServerCnxTest {
// add `clusterDispatchRate` otherwise there will be a NPE
//
`org.apache.pulsar.broker.service.AbstractTopic.updateNamespaceReplicatorDispatchRate`
policies.replicatorDispatchRate = new HashMap<>();
- testPulsarServiceFactory.getPulsarResources().getNamespaceResources()
+ pulsarTestContext.getPulsarResources().getNamespaceResources()
.createPolicies(TopicName.get(encryptionRequiredTopicName).getNamespaceObject(),
policies);
@@ -1574,7 +1574,7 @@ public class ServerCnxTest {
// add `clusterDispatchRate` otherwise there will be a NPE
//
`org.apache.pulsar.broker.service.AbstractTopic.updateNamespaceReplicatorDispatchRate`
policies.replicatorDispatchRate = new HashMap<>();
- testPulsarServiceFactory.getPulsarResources().getNamespaceResources()
+ pulsarTestContext.getPulsarResources().getNamespaceResources()
.createPolicies(TopicName.get(encryptionRequiredTopicName).getNamespaceObject(),
policies);
@@ -1614,7 +1614,7 @@ public class ServerCnxTest {
// add `clusterDispatchRate` otherwise there will be a NPE
//
`org.apache.pulsar.broker.service.AbstractTopic.updateNamespaceReplicatorDispatchRate`
policies.replicatorDispatchRate = new HashMap<>();
- testPulsarServiceFactory.getPulsarResources().getNamespaceResources()
+ pulsarTestContext.getPulsarResources().getNamespaceResources()
.createPolicies(TopicName.get(encryptionRequiredTopicName).getNamespaceObject(),
policies);
@@ -1660,7 +1660,7 @@ public class ServerCnxTest {
// add `clusterDispatchRate` otherwise there will be a NPE
//
`org.apache.pulsar.broker.service.AbstractTopic.updateNamespaceReplicatorDispatchRate`
policies.replicatorDispatchRate = new HashMap<>();
- testPulsarServiceFactory.getPulsarResources().getNamespaceResources()
+ pulsarTestContext.getPulsarResources().getNamespaceResources()
.createPolicies(TopicName.get(encryptionRequiredTopicName).getNamespaceObject(),
policies);
@@ -1740,7 +1740,7 @@ public class ServerCnxTest {
Thread.sleep(300);
((OpenLedgerCallback)
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
return null;
- }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+ }).when(pulsarTestContext.getManagedLedgerFactory())
.asyncOpen(matches(".*success.*"),
any(ManagedLedgerConfig.class),
any(OpenLedgerCallback.class), any(Supplier.class),
any());
@@ -1751,7 +1751,7 @@ public class ServerCnxTest {
.openLedgerFailed(new ManagedLedgerException("Managed
ledger failure"), null)).start();
return null;
- }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+ }).when(pulsarTestContext.getManagedLedgerFactory())
.asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class),
any(OpenLedgerCallback.class), any(Supplier.class),
any());
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherFailoverConsumerStreamingDispatcherTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherFailoverConsumerStreamingDispatcherTest.java
index 00e31c51c9c..f86fe0701dc 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherFailoverConsumerStreamingDispatcherTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherFailoverConsumerStreamingDispatcherTest.java
@@ -32,6 +32,6 @@ public class
PersistentDispatcherFailoverConsumerStreamingDispatcherTest extends
@BeforeMethod(alwaysRun = true)
public void setup() throws Exception {
super.setup();
- testPulsarServiceFactory.getConfig().setStreamingDispatch(true);
+ pulsarTestContext.getConfig().setStreamingDispatch(true);
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicStreamingDispatcherTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicStreamingDispatcherTest.java
index d7699e0b817..440cbbe290c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicStreamingDispatcherTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicStreamingDispatcherTest.java
@@ -33,7 +33,7 @@ public class PersistentTopicStreamingDispatcherTest extends
PersistentTopicTest
@BeforeMethod(alwaysRun = true)
public void setup() throws Exception {
super.setup();
- ServiceConfiguration config = testPulsarServiceFactory.getConfig();
+ ServiceConfiguration config = pulsarTestContext.getConfig();
config.setTopicLevelPoliciesEnabled(false);
config.setSystemTopicEnabled(false);
config.setStreamingDispatch(true);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java
new file mode 100644
index 00000000000..e8a9063a1ea
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.testcontext;
+
+import org.apache.pulsar.broker.BookKeeperClientFactory;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.intercept.BrokerInterceptor;
+import org.apache.pulsar.broker.service.PulsarMetadataEventSynchronizer;
+import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+
+abstract class AbstractTestPulsarService extends PulsarService {
+ protected final MetadataStoreExtended localMetadataStore;
+ protected final MetadataStoreExtended configurationMetadataStore;
+ protected final Compactor compactor;
+ protected final BrokerInterceptor brokerInterceptor;
+ protected final BookKeeperClientFactory bookKeeperClientFactory;
+
+ public AbstractTestPulsarService(ServiceConfiguration config,
MetadataStoreExtended localMetadataStore,
+ MetadataStoreExtended
configurationMetadataStore, Compactor compactor,
+ BrokerInterceptor brokerInterceptor,
+ BookKeeperClientFactory
bookKeeperClientFactory) {
+ super(config);
+ this.localMetadataStore =
+
NonClosingProxyHandler.createNonClosingProxy(localMetadataStore,
MetadataStoreExtended.class);
+ this.configurationMetadataStore =
+
NonClosingProxyHandler.createNonClosingProxy(configurationMetadataStore,
MetadataStoreExtended.class);
+ this.compactor = compactor;
+ this.brokerInterceptor = brokerInterceptor;
+ this.bookKeeperClientFactory = bookKeeperClientFactory;
+ }
+
+ @Override
+ public MetadataStore
createConfigurationMetadataStore(PulsarMetadataEventSynchronizer synchronizer)
+ throws MetadataStoreException {
+
+ return configurationMetadataStore;
+ }
+
+ @Override
+ public MetadataStoreExtended
createLocalMetadataStore(PulsarMetadataEventSynchronizer synchronizer)
+ throws MetadataStoreException, PulsarServerException {
+ return localMetadataStore;
+ }
+
+ @Override
+ public Compactor getCompactor() throws PulsarServerException {
+ if (compactor != null) {
+ return compactor;
+ } else {
+ return super.getCompactor();
+ }
+ }
+
+ @Override
+ public BrokerInterceptor getBrokerInterceptor() {
+ if (brokerInterceptor != null) {
+ return brokerInterceptor;
+ } else {
+ return super.getBrokerInterceptor();
+ }
+ }
+
+ @Override
+ public BookKeeperClientFactory newBookKeeperClientFactory() {
+ return bookKeeperClientFactory;
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/MockBookKeeperClientFactory.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/MockBookKeeperClientFactory.java
new file mode 100644
index 00000000000..c173be9046b
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/MockBookKeeperClientFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.testcontext;
+
+import io.netty.channel.EventLoopGroup;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.pulsar.broker.BookKeeperClientFactory;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+
+class MockBookKeeperClientFactory implements BookKeeperClientFactory {
+ private final BookKeeper mockBookKeeper;
+
+ MockBookKeeperClientFactory(BookKeeper mockBookKeeper) {
+ this.mockBookKeeper = mockBookKeeper;
+ }
+
+ @Override
+ public BookKeeper create(ServiceConfiguration conf, MetadataStoreExtended
store,
+ EventLoopGroup eventLoopGroup,
+ Optional<Class<? extends
EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
+ Map<String, Object> properties) {
+ // Always return the same instance (so that we don't loose the mock BK
content on broker restart
+ return mockBookKeeper;
+ }
+
+ @Override
+ public BookKeeper create(ServiceConfiguration conf, MetadataStoreExtended
store,
+ EventLoopGroup eventLoopGroup,
+ Optional<Class<? extends
EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
+ Map<String, Object> properties, StatsLogger
statsLogger) {
+ // Always return the same instance (so that we don't loose the mock BK
content on broker restart
+ return mockBookKeeper;
+ }
+
+ @Override
+ public void close() {
+ // no-op
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherFailoverConsumerStreamingDispatcherTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonClosableMockBookKeeper.java
similarity index 53%
copy from
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherFailoverConsumerStreamingDispatcherTest.java
copy to
pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonClosableMockBookKeeper.java
index 00e31c51c9c..23cc07d3bdf 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherFailoverConsumerStreamingDispatcherTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonClosableMockBookKeeper.java
@@ -16,22 +16,29 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.broker.service.persistent;
+package org.apache.pulsar.broker.testcontext;
-import
org.apache.pulsar.broker.service.PersistentDispatcherFailoverConsumerTest;
-import org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
+import org.apache.bookkeeper.client.PulsarMockBookKeeper;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
-/**
- * PersistentDispatcherFailoverConsumerTest with {@link StreamingDispatcher}
- */
-@Test(groups = "quarantine")
-public class PersistentDispatcherFailoverConsumerStreamingDispatcherTest
extends PersistentDispatcherFailoverConsumerTest {
+// Prevent the MockBookKeeper instance from being closed when the broker is
restarted within a test
+class NonClosableMockBookKeeper extends PulsarMockBookKeeper {
+
+ public NonClosableMockBookKeeper(OrderedExecutor executor) throws
Exception {
+ super(executor);
+ }
+
+ @Override
+ public void close() {
+ // no-op
+ }
+
+ @Override
+ public void shutdown() {
+ // no-op
+ }
- @BeforeMethod(alwaysRun = true)
- public void setup() throws Exception {
- super.setup();
- testPulsarServiceFactory.getConfig().setStreamingDispatch(true);
+ public void reallyShutdown() {
+ super.shutdown();
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonClosingProxyHandler.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonClosingProxyHandler.java
new file mode 100644
index 00000000000..3156d25a74e
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonClosingProxyHandler.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.testcontext;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+
+public class NonClosingProxyHandler implements InvocationHandler {
+ private final AutoCloseable delegate;
+
+ NonClosingProxyHandler(AutoCloseable delegate) {
+ this.delegate = delegate;
+ }
+
+ public static <T extends AutoCloseable> T createNonClosingProxy(T
delegate, Class<T> interfaceClass) {
+ if (isNonClosingProxy(delegate)) {
+ return delegate;
+ }
+ return
interfaceClass.cast(Proxy.newProxyInstance(delegate.getClass().getClassLoader(),
+ new Class<?>[] {interfaceClass}, new
NonClosingProxyHandler(delegate)));
+ }
+
+ public static boolean isNonClosingProxy(Object instance) {
+ return Proxy.isProxyClass(instance.getClass())
+ && Proxy.getInvocationHandler(instance) instanceof
NonClosingProxyHandler;
+ }
+
+ public static <T extends I, I extends AutoCloseable> I getDelegate(T
instance) {
+ if (isNonClosingProxy(instance)) {
+ return (T) ((NonClosingProxyHandler)
Proxy.getInvocationHandler(instance)).getDelegate();
+ } else {
+ throw new IllegalArgumentException("not a proxy instance with
NonClosingProxyHandler");
+ }
+ }
+
+ public static <T extends AutoCloseable> void reallyClose(T instance)
throws Exception {
+ if (isNonClosingProxy(instance)) {
+ getDelegate(instance).close();
+ } else {
+ instance.close();
+ }
+ }
+
+ public AutoCloseable getDelegate() {
+ return delegate;
+ }
+
+ @Override
+ public Object invoke(Object proxy, Method method, Object[] args) throws
Throwable {
+ if (method.getName().equals("close")) {
+ return null;
+ } else {
+ return method.invoke(delegate, args);
+ }
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java
new file mode 100644
index 00000000000..c65798ed7f6
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.testcontext;
+
+import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static org.mockito.Mockito.mock;
+import io.netty.channel.EventLoopGroup;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
+import org.apache.pulsar.broker.BookKeeperClientFactory;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.intercept.BrokerInterceptor;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.broker.resources.NamespaceResources;
+import org.apache.pulsar.broker.resources.PulsarResources;
+import org.apache.pulsar.broker.resources.TopicResources;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.schema.DefaultSchemaRegistryService;
+import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
+import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+
+/**
+ * Subclass of PulsarService that is used for some tests.
+ * This was written as a replacement for the previous Mockito Spy over
PulsarService solution which caused
+ * a flaky test issue https://github.com/apache/pulsar/issues/13620.
+ */
+
+class NonStartableTestPulsarService extends AbstractTestPulsarService {
+ private final PulsarResources pulsarResources;
+ private final ManagedLedgerStorage managedLedgerClientFactory;
+ private final BrokerService brokerService;
+
+ private final SchemaRegistryService schemaRegistryService;
+
+ private final PulsarClientImpl pulsarClient;
+
+ private final NamespaceService namespaceService;
+
+ public NonStartableTestPulsarService(SpyConfig spyConfig,
ServiceConfiguration config,
+ MetadataStoreExtended
localMetadataStore,
+ MetadataStoreExtended
configurationMetadataStore,
+ Compactor compactor,
BrokerInterceptor brokerInterceptor,
+ BookKeeperClientFactory
bookKeeperClientFactory,
+ PulsarResources pulsarResources,
+ ManagedLedgerStorage
managedLedgerClientFactory) {
+ super(config, localMetadataStore, configurationMetadataStore,
compactor, brokerInterceptor,
+ bookKeeperClientFactory);
+ this.pulsarResources = pulsarResources;
+ this.managedLedgerClientFactory = managedLedgerClientFactory;
+ try {
+ this.brokerService =
spyConfig.getBrokerService().spy(TestBrokerService.class, this,
getIoEventLoopGroup());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ this.schemaRegistryService =
spyWithClassAndConstructorArgs(DefaultSchemaRegistryService.class);
+ this.pulsarClient = mock(PulsarClientImpl.class);
+ this.namespaceService = mock(NamespaceService.class);
+ try {
+ startNamespaceService();
+ } catch (PulsarServerException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void start() throws PulsarServerException {
+ throw new UnsupportedOperationException("Cannot start a non-startable
TestPulsarService");
+ }
+
+ @Override
+ public Supplier<NamespaceService> getNamespaceServiceProvider() throws
PulsarServerException {
+ return () -> namespaceService;
+ }
+
+ @Override
+ public synchronized PulsarClient getClient() throws PulsarServerException {
+ return pulsarClient;
+ }
+
+ @Override
+ public PulsarClientImpl createClientImpl(ClientConfigurationData
clientConf) throws PulsarClientException {
+ return pulsarClient;
+ }
+
+ @Override
+ public SchemaRegistryService getSchemaRegistryService() {
+ return schemaRegistryService;
+ }
+
+ @Override
+ public PulsarResources getPulsarResources() {
+ return pulsarResources;
+ }
+
+ public BrokerService getBrokerService() {
+ return brokerService;
+ }
+
+ @Override
+ public MetadataStore getConfigurationMetadataStore() {
+ return configurationMetadataStore;
+ }
+
+ @Override
+ public MetadataStoreExtended getLocalMetadataStore() {
+ return localMetadataStore;
+ }
+
+ @Override
+ public ManagedLedgerStorage getManagedLedgerClientFactory() {
+ return managedLedgerClientFactory;
+ }
+
+ @Override
+ protected PulsarResources newPulsarResources() {
+ return pulsarResources;
+ }
+
+ @Override
+ protected ManagedLedgerStorage newManagedLedgerClientFactory() throws
Exception {
+ return managedLedgerClientFactory;
+ }
+
+ @Override
+ protected BrokerService newBrokerService(PulsarService pulsar) throws
Exception {
+ return brokerService;
+ }
+
+ @Override
+ public BookKeeperClientFactory getBookKeeperClientFactory() {
+ return bookKeeperClientFactory;
+ }
+
+ static class TestBrokerService extends BrokerService {
+
+ TestBrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup)
throws Exception {
+ super(pulsar, eventLoopGroup);
+ }
+
+ @Override
+ protected CompletableFuture<Map<String, String>>
fetchTopicPropertiesAsync(TopicName topicName) {
+ return CompletableFuture.completedFuture(Collections.emptyMap());
+ }
+ }
+
+ static class TestPulsarResources extends PulsarResources {
+
+ private final TopicResources topicResources;
+ private final NamespaceResources namespaceResources;
+
+ public TestPulsarResources(MetadataStore localMetadataStore,
MetadataStore configurationMetadataStore,
+ TopicResources topicResources,
NamespaceResources namespaceResources) {
+ super(localMetadataStore, configurationMetadataStore);
+ this.topicResources = topicResources;
+ this.namespaceResources = namespaceResources;
+ }
+
+ @Override
+ public TopicResources getTopicResources() {
+ return topicResources;
+ }
+
+ @Override
+ public NamespaceResources getNamespaceResources() {
+ return namespaceResources;
+ }
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
new file mode 100644
index 00000000000..f42c3cb146e
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker.testcontext;
+
+import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgsRecordingInvocations;
+import static org.mockito.Mockito.mock;
+import com.google.common.util.concurrent.MoreExecutors;
+import io.netty.channel.EventLoopGroup;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import lombok.AccessLevel;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.Singular;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.stats.NullStatsProvider;
+import org.apache.bookkeeper.stats.StatsProvider;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.pulsar.broker.BookKeeperClientFactory;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.intercept.BrokerInterceptor;
+import org.apache.pulsar.broker.resources.NamespaceResources;
+import org.apache.pulsar.broker.resources.PulsarResources;
+import org.apache.pulsar.broker.resources.TopicResources;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.ServerCnx;
+import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
+import org.apache.pulsar.common.util.GracefulExecutorServicesShutdown;
+import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.metadata.impl.MetadataStoreFactoryImpl;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.MockZooKeeper;
+import org.apache.zookeeper.MockZooKeeperSession;
+import org.apache.zookeeper.data.ACL;
+import org.jetbrains.annotations.NotNull;
+
+@Slf4j
+@ToString
+@Getter
+@Builder(builderClassName = "Builder")
+public class PulsarTestContext implements AutoCloseable {
+ private final ServiceConfiguration config;
+ private final MetadataStoreExtended localMetadataStore;
+ private final MetadataStoreExtended configurationMetadataStore;
+ private final PulsarResources pulsarResources;
+
+ private final OrderedExecutor executor;
+
+ private final ManagedLedgerStorage managedLedgerClientFactory;
+
+ private final PulsarService pulsarService;
+
+ private final Compactor compactor;
+
+ private final BrokerService brokerService;
+
+ @Getter(AccessLevel.NONE)
+ @Singular("registerCloseable")
+ private final List<AutoCloseable> closeables;
+
+ private final BrokerInterceptor brokerInterceptor;
+
+ private final BookKeeper bookKeeperClient;
+
+ private final boolean startable;
+
+
+ public ManagedLedgerFactory getManagedLedgerFactory() {
+ return managedLedgerClientFactory.getManagedLedgerFactory();
+ }
+
+ public static Builder startableBuilder() {
+ return new StartableCustomBuilder();
+ }
+
+ public static Builder builder() {
+ return new NonStartableCustomBuilder();
+ }
+
+ public void close() throws Exception {
+ for (int i = closeables.size() - 1; i >= 0; i--) {
+ try {
+ closeables.get(i).close();
+ } catch (Exception e) {
+ log.error("Failure in calling cleanup function", e);
+ }
+ }
+ }
+
+ public ServerCnx createServerCnxSpy() {
+ return
spyWithClassAndConstructorArgsRecordingInvocations(ServerCnx.class,
+ getPulsarService());
+ }
+
+ public static class Builder {
+ protected boolean useTestPulsarResources = false;
+ protected MetadataStore pulsarResourcesMetadataStore;
+ protected Function<PulsarService, BrokerService> brokerServiceFunction;
+ protected SpyConfig.Builder spyConfigBuilder =
SpyConfig.builder(SpyConfig.SpyType.NONE);
+
+ public Builder spyByDefault() {
+ spyConfigBuilder = SpyConfig.builder(SpyConfig.SpyType.SPY);
+ return this;
+ }
+
+ public Builder spyConfig(Consumer<SpyConfig.Builder>
spyConfigCustomizer) {
+ spyConfigCustomizer.accept(spyConfigBuilder);
+ return this;
+ }
+
+
+ public Builder reuseMockBookkeeperAndMetadataStores(PulsarTestContext
otherContext) {
+ bookKeeperClient(otherContext.getBookKeeperClient());
+
localMetadataStore(NonClosingProxyHandler.createNonClosingProxy(otherContext.getLocalMetadataStore(),
+ MetadataStoreExtended.class
+ ));
+
configurationMetadataStore(NonClosingProxyHandler.createNonClosingProxy(
+ otherContext.getConfigurationMetadataStore(),
MetadataStoreExtended.class
+ ));
+ return this;
+ }
+
+ public Builder withMockZookeeper() {
+ try {
+ MockZooKeeper mockZooKeeper = createMockZooKeeper();
+ registerCloseable(mockZooKeeper::shutdown);
+ MockZooKeeperSession mockZooKeeperSession =
MockZooKeeperSession.newInstance(mockZooKeeper);
+ ZKMetadataStore zkMetadataStore = new
ZKMetadataStore(mockZooKeeperSession);
+ registerCloseable(zkMetadataStore::close);
+ localMetadataStore(zkMetadataStore);
+ configurationMetadataStore(zkMetadataStore);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return this;
+ }
+
+ private static MockZooKeeper createMockZooKeeper() throws Exception {
+ MockZooKeeper zk =
MockZooKeeper.newInstance(MoreExecutors.newDirectExecutorService());
+ List<ACL> dummyAclList = new ArrayList<>(0);
+
+ ZkUtils.createFullPathOptimistic(zk,
"/ledgers/available/192.168.1.1:" + 5000,
+ "".getBytes(StandardCharsets.UTF_8), dummyAclList,
CreateMode.PERSISTENT);
+
+ zk.create("/ledgers/LAYOUT",
"1\nflat:1".getBytes(StandardCharsets.UTF_8), dummyAclList,
+ CreateMode.PERSISTENT);
+ return zk;
+ }
+
+ public Builder useTestPulsarResources() {
+ if (startable) {
+ throw new IllegalStateException("Cannot useTestPulsarResources
when startable.");
+ }
+ useTestPulsarResources = true;
+ return this;
+ }
+
+ public Builder useTestPulsarResources(MetadataStore metadataStore) {
+ if (startable) {
+ throw new IllegalStateException("Cannot useTestPulsarResources
when startable.");
+ }
+ useTestPulsarResources = true;
+ pulsarResourcesMetadataStore = metadataStore;
+ return this;
+ }
+
+ public Builder managedLedgerClients(BookKeeper bookKeeperClient,
+ ManagedLedgerFactory
managedLedgerFactory) {
+ return managedLedgerClientFactory(
+
PulsarTestContext.createManagedLedgerClientFactory(bookKeeperClient,
managedLedgerFactory));
+ }
+
+ public Builder brokerServiceFunction(
+ Function<PulsarService, BrokerService> brokerServiceFunction) {
+ this.brokerServiceFunction = brokerServiceFunction;
+ return this;
+ }
+ }
+
+ static abstract class AbstractCustomBuilder extends Builder {
+ AbstractCustomBuilder(boolean startable) {
+ super.startable = startable;
+ }
+
+ public Builder startable(boolean startable) {
+ throw new UnsupportedOperationException("Cannot change
startability after builder creation.");
+ }
+
+ @Override
+ public final PulsarTestContext build() {
+ SpyConfig spyConfig = spyConfigBuilder.build();
+ if (super.config == null) {
+ ServiceConfiguration svcConfig = new ServiceConfiguration();
+ initializeConfig(svcConfig);
+ config(svcConfig);
+ }
+ initializeCommonPulsarServices(spyConfig);
+ initializePulsarServices(spyConfig, this);
+ if (super.startable) {
+ try {
+ super.pulsarService.start();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ brokerService(super.pulsarService.getBrokerService());
+ return super.build();
+ }
+
+ protected void initializeConfig(ServiceConfiguration svcConfig) {
+ svcConfig.setBrokerShutdownTimeoutMs(0L);
+
svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
+ svcConfig.setClusterName("pulsar-cluster");
+ svcConfig.setNumIOThreads(1);
+ svcConfig.setNumOrderedExecutorThreads(1);
+ svcConfig.setNumExecutorThreadPoolSize(2);
+ svcConfig.setNumCacheExecutorThreadPoolSize(2);
+ svcConfig.setNumHttpServerThreads(2);
+ }
+
+ private void initializeCommonPulsarServices(SpyConfig spyConfig) {
+ if (super.bookKeeperClient == null &&
super.managedLedgerClientFactory == null) {
+ if (super.executor == null) {
+ OrderedExecutor createdExecutor =
OrderedExecutor.newBuilder().numThreads(1)
+
.name(NonStartableTestPulsarService.class.getSimpleName() +
"-executor").build();
+ registerCloseable(() ->
GracefulExecutorServicesShutdown.initiate()
+ .timeout(Duration.ZERO)
+ .shutdown(createdExecutor)
+ .handle().get());
+ super.executor = createdExecutor;
+ }
+ NonClosableMockBookKeeper mockBookKeeper;
+ try {
+ mockBookKeeper =
+
spyConfig.getBookKeeperClient().spy(NonClosableMockBookKeeper.class,
super.executor);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ registerCloseable(mockBookKeeper::reallyShutdown);
+ bookKeeperClient(mockBookKeeper);
+ }
+ if (super.bookKeeperClient == null &&
super.managedLedgerClientFactory != null) {
+
bookKeeperClient(super.managedLedgerClientFactory.getBookKeeperClient());
+ }
+ if (super.localMetadataStore == null ||
super.configurationMetadataStore == null) {
+ try {
+ MetadataStoreExtended store =
MetadataStoreFactoryImpl.createExtended("memory:local",
+ MetadataStoreConfig.builder().build());
+ registerCloseable(store::close);
+ if (super.localMetadataStore == null) {
+ localMetadataStore(store);
+ }
+ if (super.configurationMetadataStore == null) {
+ configurationMetadataStore(store);
+ }
+ } catch (MetadataStoreException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ protected abstract void initializePulsarServices(SpyConfig spyConfig,
Builder builder);
+ }
+
+
+ static class StartableCustomBuilder extends AbstractCustomBuilder {
+ StartableCustomBuilder() {
+ super(true);
+ }
+
+ @Override
+ public Builder managedLedgerClientFactory(ManagedLedgerStorage
managedLedgerClientFactory) {
+ throw new IllegalStateException("Cannot set
managedLedgerClientFactory when startable.");
+ }
+
+ @Override
+ public Builder pulsarResources(PulsarResources pulsarResources) {
+ throw new IllegalStateException("Cannot set pulsarResources when
startable.");
+ }
+
+ @Override
+ protected void initializePulsarServices(SpyConfig spyConfig, Builder
builder) {
+ BookKeeperClientFactory bookKeeperClientFactory =
+ new MockBookKeeperClientFactory(builder.bookKeeperClient);
+ PulsarService pulsarService = spyConfig.getPulsarBroker()
+ .spy(StartableTestPulsarService.class, builder.config,
builder.localMetadataStore,
+ builder.configurationMetadataStore,
builder.compactor, builder.brokerInterceptor,
+ bookKeeperClientFactory);
+ registerCloseable(pulsarService::close);
+ pulsarService(pulsarService);
+ }
+
+ @Override
+ protected void initializeConfig(ServiceConfiguration svcConfig) {
+ super.initializeConfig(svcConfig);
+ svcConfig.setBrokerShutdownTimeoutMs(5000L);
+ }
+ }
+
+ static class NonStartableCustomBuilder extends AbstractCustomBuilder {
+
+ NonStartableCustomBuilder() {
+ super(false);
+ }
+
+ @Override
+ protected void initializePulsarServices(SpyConfig spyConfig, Builder
builder) {
+ if (builder.managedLedgerClientFactory == null) {
+ ManagedLedgerFactory mlFactoryMock =
mock(ManagedLedgerFactory.class);
+ managedLedgerClientFactory(
+
PulsarTestContext.createManagedLedgerClientFactory(builder.bookKeeperClient,
mlFactoryMock));
+ }
+ if (builder.pulsarResources == null) {
+ SpyConfig.SpyType spyConfigPulsarResources =
spyConfig.getPulsarResources();
+ if (useTestPulsarResources) {
+ MetadataStore metadataStore = pulsarResourcesMetadataStore;
+ if (metadataStore == null) {
+ metadataStore = builder.configurationMetadataStore;
+ }
+ NamespaceResources nsr =
spyConfigPulsarResources.spy(NamespaceResources.class, metadataStore, 30);
+ TopicResources tsr =
spyConfigPulsarResources.spy(TopicResources.class, metadataStore);
+ pulsarResources(
+ spyConfigPulsarResources.spy(
+
NonStartableTestPulsarService.TestPulsarResources.class,
builder.localMetadataStore,
+ builder.configurationMetadataStore,
+ tsr, nsr));
+ } else {
+ pulsarResources(
+
spyConfigPulsarResources.spy(PulsarResources.class, builder.localMetadataStore,
+ builder.configurationMetadataStore));
+ }
+ }
+ BookKeeperClientFactory bookKeeperClientFactory =
+ new MockBookKeeperClientFactory(builder.bookKeeperClient);
+ PulsarService pulsarService = spyConfig.getPulsarBroker()
+ .spy(NonStartableTestPulsarService.class, spyConfig,
builder.config, builder.localMetadataStore,
+ builder.configurationMetadataStore,
builder.compactor, builder.brokerInterceptor,
+ bookKeeperClientFactory, builder.pulsarResources,
+ builder.managedLedgerClientFactory);
+ registerCloseable(pulsarService::close);
+ pulsarService(pulsarService);
+ }
+ }
+
+ @NotNull
+ private static ManagedLedgerStorage
createManagedLedgerClientFactory(BookKeeper bookKeeperClient,
+
ManagedLedgerFactory managedLedgerFactory) {
+ return new ManagedLedgerStorage() {
+
+ @Override
+ public void initialize(ServiceConfiguration conf,
MetadataStoreExtended metadataStore,
+ BookKeeperClientFactory bookkeeperProvider,
EventLoopGroup eventLoopGroup)
+ throws Exception {
+
+ }
+
+ @Override
+ public ManagedLedgerFactory getManagedLedgerFactory() {
+ return managedLedgerFactory;
+ }
+
+ @Override
+ public StatsProvider getStatsProvider() {
+ return new NullStatsProvider();
+ }
+
+ @Override
+ public BookKeeper getBookKeeperClient() {
+ return bookKeeperClient;
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+ };
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/SpyConfig.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/SpyConfig.java
new file mode 100644
index 00000000000..2e3b863cc8a
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/SpyConfig.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker.testcontext;
+
+import lombok.Value;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.mockito.Mockito;
+import org.mockito.internal.creation.instance.ConstructorInstantiator;
+
[email protected](builderClassName = "Builder", toBuilder = true)
+@Value
+public class SpyConfig {
+ public enum SpyType {
+ NONE,
+ SPY,
+ SPY_ALSO_INVOCATIONS;
+
+ public <T> T spy(T object) {
+ if (object == null) {
+ return null;
+ }
+ switch (this) {
+ case NONE:
+ return object;
+ case SPY:
+ return
BrokerTestUtil.spyWithoutRecordingInvocations(object);
+ case SPY_ALSO_INVOCATIONS:
+ return Mockito.spy(object);
+ default:
+ throw new UnsupportedOperationException("Unknown spy type:
" + this);
+ }
+ }
+
+ public <T> T spy(Class<T> clazz, Object... args) {
+ switch (this) {
+ case NONE:
+ // Use Mockito's internal class to instantiate the object
+ return new ConstructorInstantiator(false,
args).newInstance(clazz);
+ case SPY:
+ return
BrokerTestUtil.spyWithClassAndConstructorArgs(clazz, args);
+ case SPY_ALSO_INVOCATIONS:
+ return
BrokerTestUtil.spyWithClassAndConstructorArgsRecordingInvocations(clazz, args);
+ default:
+ throw new UnsupportedOperationException("Unknown spy type:
" + this);
+ }
+ }
+ }
+
+ private final SpyType pulsarBroker;
+ private final SpyType pulsarResources;
+ private final SpyType brokerService;
+ private final SpyType bookKeeperClient;
+
+ public static Builder builder() {
+ return builder(SpyType.NONE);
+ }
+
+ public static Builder builder(SpyType defaultSpyType) {
+ Builder spyConfigBuilder = new Builder();
+ spyConfigBuilder.pulsarBroker(defaultSpyType);
+ spyConfigBuilder.pulsarResources(defaultSpyType);
+ spyConfigBuilder.brokerService(defaultSpyType);
+ spyConfigBuilder.bookKeeperClient(defaultSpyType);
+ return spyConfigBuilder;
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/StartableTestPulsarService.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/StartableTestPulsarService.java
new file mode 100644
index 00000000000..a171ce97c11
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/StartableTestPulsarService.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.testcontext;
+
+import org.apache.pulsar.broker.BookKeeperClientFactory;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.intercept.BrokerInterceptor;
+import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+
+class StartableTestPulsarService extends AbstractTestPulsarService {
+ public StartableTestPulsarService(ServiceConfiguration config,
+ MetadataStoreExtended localMetadataStore,
+ MetadataStoreExtended
configurationMetadataStore,
+ Compactor compactor,
+ BrokerInterceptor brokerInterceptor,
+ BookKeeperClientFactory
bookKeeperClientFactory) {
+ super(config, localMetadataStore, configurationMetadataStore,
compactor, brokerInterceptor,
+ bookKeeperClientFactory);
+ }
+}