This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b3432f4ed9c [fix][test] Replace PulsarService Mockito spy solution for
overriding getters in tests (#19323)
b3432f4ed9c is described below
commit b3432f4ed9c9c19eef4ed696253eb2c18ebbf59d
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Jan 25 16:50:50 2023 +0200
[fix][test] Replace PulsarService Mockito spy solution for overriding
getters in tests (#19323)
---
.../org/apache/pulsar/broker/PulsarService.java | 33 +-
.../pulsar/broker/service/BrokerService.java | 14 +-
.../apache/pulsar/broker/service/ServerCnx.java | 4 +-
.../pulsar/broker/PulsarServiceMockSupport.java | 75 ---
.../apache/pulsar/broker/TestPulsarService.java | 420 +++++++++++++++
.../broker/service/MessageCumulativeAckTest.java | 73 +--
.../PersistentDispatcherFailoverConsumerTest.java | 187 +++----
.../pulsar/broker/service/PersistentTopicTest.java | 563 ++++++++-------------
.../broker/service/ServerCnxAuthorizationTest.java | 108 ++--
...herFailoverConsumerStreamingDispatcherTest.java | 2 +-
.../PersistentTopicStreamingDispatcherTest.java | 8 +-
.../prometheus/NamespaceStatsAggregatorTest.java | 7 +-
12 files changed, 771 insertions(+), 723 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index c33b7a0c2d1..1532b28343c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -627,7 +627,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
Duration.ofMillis(Math.max(1L,
getConfiguration().getBrokerShutdownTimeoutMs())),
shutdownExecutor, () ->
FutureUtil.createTimeoutException("Timeout in close", getClass(), "close"));
future.handle((v, t) -> {
- if (t != null) {
+ if (t != null && getConfiguration().getBrokerShutdownTimeoutMs() >
0) {
LOG.info("Shutdown timed out after {} ms",
getConfiguration().getBrokerShutdownTimeoutMs());
LOG.info(ThreadDumpUtil.buildThreadDiagnosticString());
}
@@ -740,10 +740,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
configurationMetadataStore = localMetadataStore;
shouldShutdownConfigurationMetadataStore = false;
}
- pulsarResources = new PulsarResources(localMetadataStore,
configurationMetadataStore,
- config.getMetadataStoreOperationTimeoutSeconds());
-
-
pulsarResources.getClusterResources().getStore().registerListener(this::handleDeleteCluster);
+ pulsarResources = newPulsarResources();
orderedExecutor = OrderedExecutor.newBuilder()
.numThreads(config.getNumOrderedExecutorThreads())
@@ -757,10 +754,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
// Now we are ready to start services
this.bkClientFactory = newBookKeeperClientFactory();
- managedLedgerClientFactory = ManagedLedgerStorage.create(
- config, localMetadataStore,
- bkClientFactory, ioEventLoopGroup
- );
+ managedLedgerClientFactory = newManagedLedgerClientFactory();
this.brokerService = newBrokerService(this);
@@ -925,6 +919,23 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
}
}
+ @VisibleForTesting
+ protected ManagedLedgerStorage newManagedLedgerClientFactory() throws
Exception {
+ return ManagedLedgerStorage.create(
+ config, localMetadataStore,
+ bkClientFactory, ioEventLoopGroup
+ );
+ }
+
+ @VisibleForTesting
+ protected PulsarResources newPulsarResources() {
+ PulsarResources pulsarResources = new
PulsarResources(localMetadataStore, configurationMetadataStore,
+ config.getMetadataStoreOperationTimeoutSeconds());
+
+
pulsarResources.getClusterResources().getStore().registerListener(this::handleDeleteCluster);
+ return pulsarResources;
+ }
+
private synchronized void createMetricsServlet() {
this.metricsServlet = new PulsarPrometheusMetricsServlet(
this, config.isExposeTopicLevelMetricsInPrometheus(),
@@ -1316,11 +1327,11 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
}
public BookKeeper getBookKeeperClient() {
- return managedLedgerClientFactory.getBookKeeperClient();
+ return getManagedLedgerClientFactory().getBookKeeperClient();
}
public ManagedLedgerFactory getManagedLedgerFactory() {
- return managedLedgerClientFactory.getManagedLedgerFactory();
+ return getManagedLedgerClientFactory().getManagedLedgerFactory();
}
public ManagedLedgerStorage getManagedLedgerClientFactory() {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 5c272a97f2e..618cc089560 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -934,7 +934,8 @@ public class BrokerService implements Closeable {
// unload all namespace-bundles gracefully
long closeTopicsStartTime = System.nanoTime();
- Set<NamespaceBundle> serviceUnits =
pulsar.getNamespaceService().getOwnedServiceUnits();
+ Set<NamespaceBundle> serviceUnits =
+ pulsar.getNamespaceService() != null ?
pulsar.getNamespaceService().getOwnedServiceUnits() : null;
if (serviceUnits != null) {
try (RateLimiter rateLimiter = maxConcurrentUnload > 0 ?
RateLimiter.builder()
.scheduledExecutorService(pulsar.getExecutor())
@@ -955,12 +956,13 @@ public class BrokerService implements Closeable {
}
});
}
- }
- double closeTopicsTimeSeconds =
TimeUnit.NANOSECONDS.toMillis((System.nanoTime() - closeTopicsStartTime))
- / 1000.0;
- log.info("Unloading {} namespace-bundles completed in {} seconds",
serviceUnits.size(),
- closeTopicsTimeSeconds);
+ double closeTopicsTimeSeconds =
+ TimeUnit.NANOSECONDS.toMillis((System.nanoTime() -
closeTopicsStartTime))
+ / 1000.0;
+ log.info("Unloading {} namespace-bundles completed in {}
seconds", serviceUnits.size(),
+ closeTopicsTimeSeconds);
+ }
} catch (Exception e) {
log.error("Failed to disable broker from loadbalancer list {}",
e.getMessage(), e);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 2a83252c309..71db6e63883 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2777,7 +2777,9 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
* consumers from connection-map.
*/
protected void close() {
- ctx.close();
+ if (ctx != null) {
+ ctx.close();
+ }
}
@Override
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceMockSupport.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceMockSupport.java
deleted file mode 100644
index 9aa53164ee7..00000000000
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceMockSupport.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker;
-
-import static org.mockito.Mockito.mock;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
-import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
-import org.apache.pulsar.metadata.impl.ZKMetadataStore;
-import org.mockito.Mockito;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-@Slf4j
-public class PulsarServiceMockSupport {
-
- /**
- * see: https://github.com/apache/pulsar/pull/16821.
- * While executing
"doReturn(pulsarResources).when(pulsar).getPulsarResources()", Meta Store
Thread also accesses
- * variable PulsarService.getPulsarResources() asynchronously in logic:
"notification by zk-watcher".
- * So execute mock-cmd in meta-thread (The executor of MetaStore is a
single thread pool executor, so all things
- * will be thread safety).
- * Note: If the MetaStore's executor is no longer single-threaded, should
mock as single-threaded if you need to
- * execute this method.
- */
- public static void mockPulsarServiceProps(final PulsarService
pulsarService, Runnable mockTask)
- throws ExecutionException, InterruptedException, TimeoutException {
- final CompletableFuture<Void> mockGetPulsarResourceFuture = new
CompletableFuture<>();
- MetadataStoreExtended metadataStoreExtended =
pulsarService.getLocalMetadataStore();
- if (metadataStoreExtended instanceof AbstractMetadataStore){
- AbstractMetadataStore abstractMetadataStore =
(AbstractMetadataStore) metadataStoreExtended;
- abstractMetadataStore.execute(() -> {
- mockTask.run();
- mockGetPulsarResourceFuture.complete(null);
- }, mock(CompletableFuture.class));
- try {
- mockGetPulsarResourceFuture.get(1, TimeUnit.SECONDS);
- } catch (TimeoutException timeoutException){
- mockTask.run();
- }
- } else {
- mockTask.run();
- }
- }
-
- @Test
- public void testMockMetaStore() throws Exception{
- AtomicInteger integer = new AtomicInteger();
- PulsarService pulsarService = Mockito.mock(PulsarService.class);
-
Mockito.when(pulsarService.getLocalMetadataStore()).thenReturn(Mockito.mock(ZKMetadataStore.class));
- mockPulsarServiceProps(pulsarService, () -> integer.incrementAndGet());
- Assert.assertEquals(integer.get(), 1);
- }
-}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/TestPulsarService.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/TestPulsarService.java
new file mode 100644
index 00000000000..873be15b8ff
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/TestPulsarService.java
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker;
+
+import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgsRecordingInvocations;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Optional;
+import java.util.function.Function;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.ToString;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.stats.NullStatsProvider;
+import org.apache.bookkeeper.stats.StatsProvider;
+import org.apache.pulsar.broker.resources.NamespaceResources;
+import org.apache.pulsar.broker.resources.PulsarResources;
+import org.apache.pulsar.broker.resources.TopicResources;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.PulsarMetadataEventSynchronizer;
+import org.apache.pulsar.broker.service.ServerCnx;
+import org.apache.pulsar.broker.service.schema.DefaultSchemaRegistryService;
+import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
+import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
+import org.apache.pulsar.broker.transaction.TransactionTestBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.util.GracefulExecutorServicesShutdown;
+import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.metadata.impl.MetadataStoreFactoryImpl;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Subclass of PulsarService that is used for some tests.
+ * This was written as a replacement for the previous Mockito Spy over
PulsarService solution which caused
+ * a flaky test issue https://github.com/apache/pulsar/issues/13620.
+ */
+public class TestPulsarService extends PulsarService {
+
+
+ @ToString
+ @Getter
+ @Builder
+ public static class Factory implements AutoCloseable {
+ private final ServiceConfiguration config;
+ private final MetadataStoreExtended localMetadataStore;
+ private final MetadataStoreExtended configurationMetadataStore;
+ private final PulsarResources pulsarResources;
+
+ private final OrderedExecutor executor;
+
+ private final EventLoopGroup eventLoopGroup;
+
+ private final ManagedLedgerStorage managedLedgerClientFactory;
+
+ private final Function<PulsarService, BrokerService>
brokerServiceFunction;
+
+ private final boolean useSpies;
+
+ private final PulsarService pulsarService;
+
+ private final Compactor compactor;
+
+ private final BrokerService brokerService;
+
+ public ManagedLedgerFactory getManagedLedgerFactory() {
+ return managedLedgerClientFactory.getManagedLedgerFactory();
+ }
+
+ public static FactoryBuilder builder() {
+ return new CustomFactoryBuilder();
+ }
+
+ public void close() throws Exception {
+ pulsarService.getBrokerService().close();
+ pulsarService.close();
+ GracefulExecutorServicesShutdown.initiate()
+ .timeout(Duration.ZERO)
+ .shutdown(executor)
+ .handle().get();
+ eventLoopGroup.shutdownGracefully().get();
+ if (localMetadataStore != configurationMetadataStore) {
+ localMetadataStore.close();
+ configurationMetadataStore.close();
+ } else {
+ localMetadataStore.close();
+ }
+ }
+
+ public ServerCnx createServerCnxSpy() {
+ return
spyWithClassAndConstructorArgsRecordingInvocations(ServerCnx.class,
+ getPulsarService());
+ }
+
+ public static class FactoryBuilder {
+ protected boolean useTestPulsarResources = false;
+ protected MetadataStore pulsarResourcesMetadataStore;
+
+ public FactoryBuilder useTestPulsarResources() {
+ useTestPulsarResources = true;
+ return this;
+ }
+
+ public FactoryBuilder useTestPulsarResources(MetadataStore
metadataStore) {
+ useTestPulsarResources = true;
+ pulsarResourcesMetadataStore = metadataStore;
+ return this;
+ }
+
+ public FactoryBuilder managedLedgerClients(BookKeeper
bookKeeperClient,
+ ManagedLedgerFactory
managedLedgerFactory) {
+ return managedLedgerClientFactory(
+
Factory.createManagedLedgerClientFactory(bookKeeperClient,
managedLedgerFactory));
+ }
+ }
+
+ private static class CustomFactoryBuilder extends
Factory.FactoryBuilder {
+
+ @Override
+ public Factory build() {
+ initializeDefaults();
+ return super.build();
+ }
+
+ private void initializeDefaults() {
+ try {
+ if (super.managedLedgerClientFactory == null) {
+ if (super.executor == null) {
+ super.executor =
OrderedExecutor.newBuilder().numThreads(1)
+
.name(TestPulsarService.class.getSimpleName() + "-executor").build();
+ }
+ TransactionTestBase.NonClosableMockBookKeeper
mockBookKeeper =
+
TransactionTestBase.createMockBookKeeper(super.executor);
+ ManagedLedgerFactory mlFactoryMock =
mock(ManagedLedgerFactory.class);
+
+ managedLedgerClientFactory(
+
Factory.createManagedLedgerClientFactory(mockBookKeeper, mlFactoryMock));
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ if (super.config == null) {
+ ServiceConfiguration svcConfig =
spy(ServiceConfiguration.class);
+ svcConfig.setBrokerShutdownTimeoutMs(0L);
+
svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
+ svcConfig.setClusterName("pulsar-cluster");
+ config(svcConfig);
+ }
+ if (super.localMetadataStore == null ||
super.configurationMetadataStore == null) {
+ try {
+ MetadataStoreExtended store =
MetadataStoreFactoryImpl.createExtended("memory:local",
+ MetadataStoreConfig.builder().build());
+ if (super.localMetadataStore == null) {
+ localMetadataStore(store);
+ }
+ if (super.configurationMetadataStore == null) {
+ configurationMetadataStore(store);
+ }
+ } catch (MetadataStoreException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ if (super.pulsarResources == null) {
+ if (useTestPulsarResources) {
+ MetadataStore metadataStore =
pulsarResourcesMetadataStore;
+ if (metadataStore == null) {
+ metadataStore = super.configurationMetadataStore;
+ }
+ NamespaceResources nsr =
+
spyWithClassAndConstructorArgs(NamespaceResources.class, metadataStore, 30);
+ TopicResources tsr =
spyWithClassAndConstructorArgs(TopicResources.class, metadataStore);
+ if (!super.useSpies) {
+ pulsarResources(
+ new
TestPulsarResources(super.localMetadataStore, super.configurationMetadataStore,
+ tsr, nsr));
+ } else {
+ pulsarResources(
+
spyWithClassAndConstructorArgs(TestPulsarResources.class,
super.localMetadataStore,
+ super.configurationMetadataStore,
tsr, nsr));
+ }
+ } else {
+ if (!super.useSpies) {
+ pulsarResources(
+ new
PulsarResources(super.localMetadataStore, super.configurationMetadataStore));
+ } else {
+ pulsarResources(
+
spyWithClassAndConstructorArgs(PulsarResources.class, super.localMetadataStore,
+ super.configurationMetadataStore));
+ }
+ }
+ }
+ if (super.brokerServiceFunction == null) {
+ if (super.brokerService == null) {
+ if (super.eventLoopGroup == null) {
+ super.eventLoopGroup = new NioEventLoopGroup();
+ }
+ brokerServiceFunction(pulsarService -> {
+ try {
+ if (!super.useSpies) {
+ return new BrokerService(pulsarService,
super.eventLoopGroup);
+ } else {
+ return
spyWithClassAndConstructorArgs(BrokerService.class, pulsarService,
+ super.eventLoopGroup);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ } else {
+ brokerServiceFunction(pulsarService ->
super.brokerService);
+ }
+ }
+ if (!super.useSpies) {
+ pulsarService(new TestPulsarService(super.config,
super.localMetadataStore,
+ super.configurationMetadataStore,
super.pulsarResources, super.managedLedgerClientFactory,
+ super.brokerServiceFunction, super.executor,
super.compactor));
+ } else {
+
pulsarService(spyWithClassAndConstructorArgs(TestPulsarService.class,
super.config,
+ super.localMetadataStore,
super.configurationMetadataStore, super.pulsarResources,
+ super.managedLedgerClientFactory,
super.brokerServiceFunction, super.executor,
+ super.compactor));
+ }
+ if (super.brokerService == null) {
+ brokerService(super.pulsarService.getBrokerService());
+ }
+ }
+ }
+
+ @NotNull
+ private static ManagedLedgerStorage
createManagedLedgerClientFactory(BookKeeper bookKeeperClient,
+
ManagedLedgerFactory managedLedgerFactory) {
+ return new ManagedLedgerStorage() {
+
+ @Override
+ public void initialize(ServiceConfiguration conf,
MetadataStoreExtended metadataStore,
+ BookKeeperClientFactory
bookkeeperProvider, EventLoopGroup eventLoopGroup)
+ throws Exception {
+
+ }
+
+ @Override
+ public ManagedLedgerFactory getManagedLedgerFactory() {
+ return managedLedgerFactory;
+ }
+
+ @Override
+ public StatsProvider getStatsProvider() {
+ return new NullStatsProvider();
+ }
+
+ @Override
+ public BookKeeper getBookKeeperClient() {
+ return bookKeeperClient;
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+ };
+ }
+ }
+
+ private static class TestPulsarResources extends PulsarResources {
+
+ private final TopicResources topicResources;
+ private final NamespaceResources namespaceResources;
+
+ public TestPulsarResources(MetadataStore localMetadataStore,
MetadataStore configurationMetadataStore,
+ TopicResources topicResources,
NamespaceResources namespaceResources) {
+ super(localMetadataStore, configurationMetadataStore);
+ this.topicResources = topicResources;
+ this.namespaceResources = namespaceResources;
+ }
+
+ @Override
+ public TopicResources getTopicResources() {
+ return topicResources;
+ }
+
+ @Override
+ public NamespaceResources getNamespaceResources() {
+ return namespaceResources;
+ }
+ }
+
+ private final MetadataStoreExtended localMetadataStore;
+ private final MetadataStoreExtended configurationMetadataStore;
+ private final PulsarResources pulsarResources;
+ private final ManagedLedgerStorage managedLedgerClientFactory;
+ private final BrokerService brokerService;
+
+ private final OrderedExecutor executor;
+
+ private final Compactor compactor;
+
+ private final SchemaRegistryService schemaRegistryService;
+
+ private final PulsarClient pulsarClient;
+
+ protected TestPulsarService(ServiceConfiguration config,
MetadataStoreExtended localMetadataStore,
+ MetadataStoreExtended
configurationMetadataStore, PulsarResources pulsarResources,
+ ManagedLedgerStorage
managedLedgerClientFactory,
+ Function<PulsarService, BrokerService>
brokerServiceFunction, OrderedExecutor executor,
+ Compactor compactor) {
+ super(config);
+ this.localMetadataStore = localMetadataStore;
+ this.configurationMetadataStore = configurationMetadataStore;
+ this.pulsarResources = pulsarResources;
+ this.managedLedgerClientFactory = managedLedgerClientFactory;
+ this.brokerService = brokerServiceFunction.apply(this);
+ this.executor = executor;
+ this.compactor = compactor;
+ this.schemaRegistryService =
spyWithClassAndConstructorArgs(DefaultSchemaRegistryService.class);
+ this.pulsarClient = mock(PulsarClientImpl.class);
+ }
+
+ @Override
+ public synchronized PulsarClient getClient() throws PulsarServerException {
+ return pulsarClient;
+ }
+
+ @Override
+ public SchemaRegistryService getSchemaRegistryService() {
+ return schemaRegistryService;
+ }
+
+ @Override
+ public MetadataStore
createConfigurationMetadataStore(PulsarMetadataEventSynchronizer synchronizer)
+ throws MetadataStoreException {
+ return configurationMetadataStore;
+ }
+
+ @Override
+ public MetadataStoreExtended
createLocalMetadataStore(PulsarMetadataEventSynchronizer synchronizer)
+ throws MetadataStoreException, PulsarServerException {
+ return localMetadataStore;
+ }
+
+ @Override
+ public PulsarResources getPulsarResources() {
+ return pulsarResources;
+ }
+
+ public BrokerService getBrokerService() {
+ return brokerService;
+ }
+
+ @Override
+ public Compactor getCompactor() throws PulsarServerException {
+ if (compactor != null) {
+ return compactor;
+ } else {
+ return super.getCompactor();
+ }
+ }
+
+ @Override
+ public MetadataStore getConfigurationMetadataStore() {
+ return configurationMetadataStore;
+ }
+
+ @Override
+ public MetadataStoreExtended getLocalMetadataStore() {
+ return localMetadataStore;
+ }
+
+ @Override
+ public ManagedLedgerStorage getManagedLedgerClientFactory() {
+ return managedLedgerClientFactory;
+ }
+
+ @Override
+ public OrderedExecutor getOrderedExecutor() {
+ return executor;
+ }
+
+ @Override
+ protected PulsarResources newPulsarResources() {
+ return pulsarResources;
+ }
+
+ @Override
+ protected ManagedLedgerStorage newManagedLedgerClientFactory() throws
Exception {
+ return managedLedgerClientFactory;
+ }
+
+ @Override
+ protected BrokerService newBrokerService(PulsarService pulsar) throws
Exception {
+ return brokerService;
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
index 0a227f6812c..703376b2546 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
@@ -19,7 +19,6 @@
package org.apache.pulsar.broker.service;
import static java.util.Collections.emptyMap;
-import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
import static org.apache.pulsar.common.api.proto.CommandAck.AckType.Cumulative;
import static
org.apache.pulsar.common.api.proto.CommandSubscribe.SubType.Exclusive;
import static
org.apache.pulsar.common.api.proto.CommandSubscribe.SubType.Failover;
@@ -36,29 +35,17 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
import java.net.InetSocketAddress;
-import java.util.Optional;
-import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.mledger.ManagedLedger;
-import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
-import org.apache.pulsar.broker.PulsarService;
-import org.apache.pulsar.broker.PulsarServiceMockSupport;
-import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.resources.PulsarResources;
+import org.apache.pulsar.broker.TestPulsarService;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
-import org.apache.pulsar.broker.transaction.TransactionTestBase;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.metadata.api.MetadataStore;
-import org.apache.pulsar.metadata.api.MetadataStoreConfig;
-import org.apache.pulsar.metadata.api.MetadataStoreFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
@@ -67,45 +54,17 @@ import org.testng.annotations.Test;
@Test(groups = "broker")
public class MessageCumulativeAckTest {
private final int consumerId = 1;
- private BrokerService brokerService;
+
private ServerCnx serverCnx;
- private MetadataStore store;
- protected PulsarService pulsar;
- private OrderedExecutor executor;
- private EventLoopGroup eventLoopGroup;
private PersistentSubscription sub;
+ private TestPulsarService.Factory testPulsarServiceFactory;
@BeforeMethod
public void setup() throws Exception {
- executor =
OrderedExecutor.newBuilder().numThreads(1).name("persistent-dispatcher-cumulative-ack-test").build();
- ServiceConfiguration svcConfig = spy(ServiceConfiguration.class);
- svcConfig.setBrokerShutdownTimeoutMs(0L);
- svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
- svcConfig.setClusterName("pulsar-cluster");
- pulsar = spyWithClassAndConstructorArgs(PulsarService.class,
svcConfig);
- doReturn(svcConfig).when(pulsar).getConfiguration();
-
- ManagedLedgerFactory mlFactoryMock = mock(ManagedLedgerFactory.class);
- doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory();
- doReturn(TransactionTestBase.createMockBookKeeper(executor))
- .when(pulsar).getBookKeeperClient();
-
- store = MetadataStoreFactory.create("memory:local",
MetadataStoreConfig.builder().build());
- doReturn(store).when(pulsar).getLocalMetadataStore();
- doReturn(store).when(pulsar).getConfigurationMetadataStore();
-
- PulsarResources pulsarResources = new PulsarResources(store, store);
- PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
- doReturn(pulsarResources).when(pulsar).getPulsarResources();
- });
-
- eventLoopGroup = new NioEventLoopGroup();
- brokerService = spyWithClassAndConstructorArgs(BrokerService.class,
pulsar, eventLoopGroup);
- PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
- doReturn(brokerService).when(pulsar).getBrokerService();
- });
-
- serverCnx = spyWithClassAndConstructorArgs(ServerCnx.class, pulsar);
+ testPulsarServiceFactory = TestPulsarService.Factory.builder()
+ .build();
+
+ serverCnx = testPulsarServiceFactory.createServerCnxSpy();
doReturn(true).when(serverCnx).isActive();
doReturn(true).when(serverCnx).isWritable();
doReturn(new InetSocketAddress("localhost",
1234)).when(serverCnx).clientAddress();
@@ -115,7 +74,7 @@ public class MessageCumulativeAckTest {
.when(serverCnx).getCommandSender();
String topicName =
TopicName.get("MessageCumulativeAckTest").toString();
- PersistentTopic persistentTopic = new PersistentTopic(topicName,
mock(ManagedLedger.class), brokerService);
+ PersistentTopic persistentTopic = new PersistentTopic(topicName,
mock(ManagedLedger.class), testPulsarServiceFactory.getBrokerService());
sub = spy(new PersistentSubscription(persistentTopic, "sub-1",
mock(ManagedCursorImpl.class), false));
doNothing().when(sub).acknowledgeMessage(any(), any(), any());
@@ -123,20 +82,10 @@ public class MessageCumulativeAckTest {
@AfterMethod(alwaysRun = true)
public void shutdown() throws Exception {
- if (brokerService != null) {
- brokerService.close();
- brokerService = null;
- }
- if (pulsar != null) {
- pulsar.close();
- pulsar = null;
- }
-
- executor.shutdown();
- if (eventLoopGroup != null) {
- eventLoopGroup.shutdownGracefully().get();
+ if (testPulsarServiceFactory != null) {
+ testPulsarServiceFactory.close();
+ testPulsarServiceFactory = null;
}
- store.close();
sub = null;
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index 5381a367afd..2bebac14023 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.broker.service;
-import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
import static
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.matches;
@@ -38,8 +37,6 @@ import static org.testng.AssertJUnit.assertSame;
import static org.testng.AssertJUnit.assertTrue;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.ArrayList;
@@ -51,7 +48,6 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.Supplier;
-import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback;
@@ -61,19 +57,15 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
-import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.pulsar.broker.PulsarService;
-import org.apache.pulsar.broker.PulsarServiceMockSupport;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.TestPulsarService;
import org.apache.pulsar.broker.namespace.NamespaceService;
-import org.apache.pulsar.broker.resources.PulsarResources;
import
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
-import org.apache.pulsar.broker.transaction.TransactionTestBase;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.CommandActiveConsumerChange;
@@ -82,12 +74,6 @@ import
org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.metadata.api.MetadataStore;
-import org.apache.pulsar.metadata.api.MetadataStoreConfig;
-import org.apache.pulsar.metadata.api.MetadataStoreFactory;
-import org.apache.zookeeper.ZooKeeper;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -98,59 +84,30 @@ import org.testng.annotations.Test;
@Test(groups = "quarantine")
public class PersistentDispatcherFailoverConsumerTest {
- private BrokerService brokerService;
- private ManagedLedgerFactory mlFactoryMock;
private ServerCnx serverCnx;
private ServerCnx serverCnxWithOldVersion;
private ManagedLedger ledgerMock;
private ManagedCursor cursorMock;
- private MetadataStore store;
private ChannelHandlerContext channelCtx;
private LinkedBlockingQueue<CommandActiveConsumerChange> consumerChanges;
- private ZooKeeper mockZk;
- protected PulsarService pulsar;
+
+ protected TestPulsarService.Factory testPulsarServiceFactory;
+
final String successTopicName =
"persistent://part-perf/global/perf.t1/ptopic";
final String failTopicName =
"persistent://part-perf/global/perf.t1/pfailTopic";
- private OrderedExecutor executor;
- private EventLoopGroup eventLoopGroup;
-
@BeforeMethod
public void setup() throws Exception {
- executor =
OrderedExecutor.newBuilder().numThreads(1).name("persistent-dispatcher-failover-test").build();
ServiceConfiguration svcConfig = spy(ServiceConfiguration.class);
svcConfig.setBrokerShutdownTimeoutMs(0L);
svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
svcConfig.setClusterName("pulsar-cluster");
svcConfig.setSystemTopicEnabled(false);
svcConfig.setTopicLevelPoliciesEnabled(false);
- pulsar = spyWithClassAndConstructorArgs(PulsarService.class,
svcConfig);
- store = MetadataStoreFactory.create("memory:local",
MetadataStoreConfig.builder().build());
- doReturn(store).when(pulsar).getLocalMetadataStore();
- doReturn(store).when(pulsar).getConfigurationMetadataStore();
-
- PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
- doReturn(svcConfig).when(pulsar).getConfiguration();
- });
-
- mlFactoryMock = mock(ManagedLedgerFactory.class);
- PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
- doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory();
- });
-
- doReturn(TransactionTestBase.createMockBookKeeper(executor))
- .when(pulsar).getBookKeeperClient();
- eventLoopGroup = new NioEventLoopGroup();
-
- PulsarResources pulsarResources = new PulsarResources(store, store);
- PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
- doReturn(pulsarResources).when(pulsar).getPulsarResources();
- });
-
- brokerService = spyWithClassAndConstructorArgs(BrokerService.class,
pulsar, eventLoopGroup);
- PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
- doReturn(brokerService).when(pulsar).getBrokerService();
- });
+ testPulsarServiceFactory = TestPulsarService.Factory.builder()
+ .config(svcConfig)
+ .useSpies(true)
+ .build();
consumerChanges = new LinkedBlockingQueue<>();
this.channelCtx = mock(ChannelHandlerContext.class);
@@ -175,7 +132,7 @@ public class PersistentDispatcherFailoverConsumerTest {
return null;
}).when(channelCtx).writeAndFlush(any(), any());
- serverCnx = spyWithClassAndConstructorArgs(ServerCnx.class, pulsar);
+ serverCnx = testPulsarServiceFactory.createServerCnxSpy();
doReturn(true).when(serverCnx).isActive();
doReturn(true).when(serverCnx).isWritable();
doReturn(new InetSocketAddress("localhost",
1234)).when(serverCnx).clientAddress();
@@ -184,7 +141,7 @@ public class PersistentDispatcherFailoverConsumerTest {
doReturn(new PulsarCommandSenderImpl(null, serverCnx))
.when(serverCnx).getCommandSender();
- serverCnxWithOldVersion =
spyWithClassAndConstructorArgs(ServerCnx.class, pulsar);
+ serverCnxWithOldVersion =
testPulsarServiceFactory.createServerCnxSpy();
doReturn(true).when(serverCnxWithOldVersion).isActive();
doReturn(true).when(serverCnxWithOldVersion).isWritable();
doReturn(new InetSocketAddress("localhost", 1234))
@@ -196,9 +153,7 @@ public class PersistentDispatcherFailoverConsumerTest {
.when(serverCnxWithOldVersion).getCommandSender();
NamespaceService nsSvc = mock(NamespaceService.class);
- PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
- doReturn(nsSvc).when(pulsar).getNamespaceService();
- });
+
doReturn(nsSvc).when(testPulsarServiceFactory.getPulsarService()).getNamespaceService();
doReturn(true).when(nsSvc).isServiceUnitOwned(any(NamespaceBundle.class));
doReturn(true).when(nsSvc).isServiceUnitActive(any(TopicName.class));
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).checkTopicOwnership(any(TopicName.class));
@@ -209,23 +164,9 @@ public class PersistentDispatcherFailoverConsumerTest {
@AfterMethod(alwaysRun = true)
public void shutdown() throws Exception {
- if (brokerService != null) {
- brokerService.close();
- brokerService = null;
- }
- if (pulsar != null) {
- pulsar.close();
- pulsar = null;
- }
-
- if (executor != null) {
- executor.shutdown();
- }
- if (eventLoopGroup != null) {
- eventLoopGroup.shutdownGracefully().get();
- }
- if (store != null) {
- store.close();
+ if (testPulsarServiceFactory != null) {
+ testPulsarServiceFactory.close();
+ testPulsarServiceFactory = null;
}
}
@@ -233,64 +174,50 @@ public class PersistentDispatcherFailoverConsumerTest {
ledgerMock = mock(ManagedLedger.class);
cursorMock = mock(ManagedCursorImpl.class);
- doReturn(new ArrayList<Object>()).when(ledgerMock).getCursors();
+ doReturn(new ArrayList<>()).when(ledgerMock).getCursors();
doReturn("mockCursor").when(cursorMock).getName();
// call openLedgerComplete with ledgerMock on ML factory asyncOpen
- doAnswer(new Answer<Object>() {
- @Override
- public Object answer(InvocationOnMock invocationOnMock) throws
Throwable {
- ((OpenLedgerCallback)
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
- return null;
- }
- }).when(mlFactoryMock).asyncOpen(matches(".*success.*"),
any(ManagedLedgerConfig.class),
- any(OpenLedgerCallback.class), any(Supplier.class), any());
+ doAnswer(invocationOnMock -> {
+ ((OpenLedgerCallback)
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
+ return null;
+ }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+ .asyncOpen(matches(".*success.*"),
any(ManagedLedgerConfig.class),
+ any(OpenLedgerCallback.class), any(Supplier.class),
any());
// call openLedgerFailed on ML factory asyncOpen
- doAnswer(new Answer<Object>() {
- @Override
- public Object answer(InvocationOnMock invocationOnMock) throws
Throwable {
- ((OpenLedgerCallback) invocationOnMock.getArguments()[2])
- .openLedgerFailed(new ManagedLedgerException("Managed
ledger failure"), null);
- return null;
- }
- }).when(mlFactoryMock).asyncOpen(matches(".*fail.*"),
any(ManagedLedgerConfig.class),
- any(OpenLedgerCallback.class), any(Supplier.class), any());
+ doAnswer(invocationOnMock -> {
+ ((OpenLedgerCallback) invocationOnMock.getArguments()[2])
+ .openLedgerFailed(new ManagedLedgerException("Managed
ledger failure"), null);
+ return null;
+ }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+ .asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class),
+ any(OpenLedgerCallback.class), any(Supplier.class),
any());
// call addComplete on ledger asyncAddEntry
- doAnswer(new Answer<Object>() {
- @Override
- public Object answer(InvocationOnMock invocationOnMock) throws
Throwable {
- ((AddEntryCallback)
invocationOnMock.getArguments()[1]).addComplete(
- new PositionImpl(1, 1), null, null);
- return null;
- }
+ doAnswer(invocationOnMock -> {
+ ((AddEntryCallback)
invocationOnMock.getArguments()[1]).addComplete(
+ new PositionImpl(1, 1), null, null);
+ return null;
}).when(ledgerMock).asyncAddEntry(any(byte[].class),
any(AddEntryCallback.class), any());
// call openCursorComplete on cursor asyncOpen
- doAnswer(new Answer<Object>() {
- @Override
- public Object answer(InvocationOnMock invocationOnMock) throws
Throwable {
- ((OpenCursorCallback)
invocationOnMock.getArguments()[2]).openCursorComplete(cursorMock, null);
- return null;
- }
- }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"),
any(InitialPosition.class), any(OpenCursorCallback.class), any());
+ doAnswer(invocationOnMock -> {
+ ((OpenCursorCallback)
invocationOnMock.getArguments()[2]).openCursorComplete(cursorMock, null);
+ return null;
+ }).when(ledgerMock)
+ .asyncOpenCursor(matches(".*success.*"),
any(InitialPosition.class), any(OpenCursorCallback.class),
+ any());
// call deleteLedgerComplete on ledger asyncDelete
- doAnswer(new Answer<Object>() {
- @Override
- public Object answer(InvocationOnMock invocationOnMock) throws
Throwable {
- ((DeleteLedgerCallback)
invocationOnMock.getArguments()[0]).deleteLedgerComplete(null);
- return null;
- }
+ doAnswer(invocationOnMock -> {
+ ((DeleteLedgerCallback)
invocationOnMock.getArguments()[0]).deleteLedgerComplete(null);
+ return null;
}).when(ledgerMock).asyncDelete(any(DeleteLedgerCallback.class),
any());
- doAnswer(new Answer<Object>() {
- @Override
- public Object answer(InvocationOnMock invocationOnMock) throws
Throwable {
- ((DeleteCursorCallback)
invocationOnMock.getArguments()[1]).deleteCursorComplete(null);
- return null;
- }
+ doAnswer(invocationOnMock -> {
+ ((DeleteCursorCallback)
invocationOnMock.getArguments()[1]).deleteCursorComplete(null);
+ return null;
}).when(ledgerMock).asyncDeleteCursor(matches(".*success.*"),
any(DeleteCursorCallback.class), any());
}
@@ -303,7 +230,8 @@ public class PersistentDispatcherFailoverConsumerTest {
@Test
public void testConsumerGroupChangesWithOldNewConsumers() throws Exception
{
- PersistentTopic topic = new PersistentTopic(successTopicName,
ledgerMock, brokerService);
+ PersistentTopic topic =
+ new PersistentTopic(successTopicName, ledgerMock,
testPulsarServiceFactory.getBrokerService());
PersistentSubscription sub = new PersistentSubscription(topic,
"sub-1", cursorMock, false);
int partitionIndex = 0;
@@ -315,7 +243,8 @@ public class PersistentDispatcherFailoverConsumerTest {
// 2. Add old consumer
Consumer consumer1 = new Consumer(sub, SubType.Exclusive,
topic.getName(), 1 /* consumer id */, 0,
- "Cons1"/* consumer name */, true, serverCnxWithOldVersion,
"myrole-1", Collections.emptyMap(), false, null, MessageId.latest,
DEFAULT_CONSUMER_EPOCH);
+ "Cons1"/* consumer name */, true, serverCnxWithOldVersion,
"myrole-1", Collections.emptyMap(), false,
+ null, MessageId.latest, DEFAULT_CONSUMER_EPOCH);
pdfc.addConsumer(consumer1);
List<Consumer> consumers = pdfc.getConsumers();
assertSame(consumers.get(0).consumerName(), consumer1.consumerName());
@@ -326,7 +255,8 @@ public class PersistentDispatcherFailoverConsumerTest {
// 3. Add new consumer
Consumer consumer2 = new Consumer(sub, SubType.Exclusive,
topic.getName(), 2 /* consumer id */, 0,
- "Cons2"/* consumer name */, true, serverCnx, "myrole-1",
Collections.emptyMap(), false, null, MessageId.latest, DEFAULT_CONSUMER_EPOCH);
+ "Cons2"/* consumer name */, true, serverCnx, "myrole-1",
Collections.emptyMap(), false, null,
+ MessageId.latest, DEFAULT_CONSUMER_EPOCH);
pdfc.addConsumer(consumer2);
consumers = pdfc.getConsumers();
assertSame(consumers.get(0).consumerName(), consumer1.consumerName());
@@ -342,7 +272,7 @@ public class PersistentDispatcherFailoverConsumerTest {
public void testAddRemoveConsumer() throws Exception {
log.info("--- Starting
PersistentDispatcherFailoverConsumerTest::testAddConsumer ---");
- PersistentTopic topic = new PersistentTopic(successTopicName,
ledgerMock, brokerService);
+ PersistentTopic topic = new PersistentTopic(successTopicName,
ledgerMock, testPulsarServiceFactory.getBrokerService());
PersistentSubscription sub = new PersistentSubscription(topic,
"sub-1", cursorMock, false);
int partitionIndex = 4;
@@ -473,7 +403,8 @@ public class PersistentDispatcherFailoverConsumerTest {
public void testAddRemoveConsumerNonPartitionedTopic() throws Exception {
log.info("--- Starting
PersistentDispatcherFailoverConsumerTest::testAddConsumer ---");
- PersistentTopic topic = new PersistentTopic(successTopicName,
ledgerMock, brokerService);
+ PersistentTopic topic =
+ new PersistentTopic(successTopicName, ledgerMock,
testPulsarServiceFactory.getBrokerService());
PersistentSubscription sub = new PersistentSubscription(topic,
"sub-1", cursorMock, false);
// Non partitioned topic.
@@ -533,7 +464,8 @@ public class PersistentDispatcherFailoverConsumerTest {
@Test
public void
testMultipleDispatcherGetNextConsumerWithDifferentPriorityLevel() throws
Exception {
- PersistentTopic topic = new PersistentTopic(successTopicName,
ledgerMock, brokerService);
+ PersistentTopic topic =
+ new PersistentTopic(successTopicName, ledgerMock,
testPulsarServiceFactory.getBrokerService());
PersistentDispatcherMultipleConsumers dispatcher = new
PersistentDispatcherMultipleConsumers(topic, cursorMock, null);
Consumer consumer1 = createConsumer(topic, 0, 2, false, 1);
Consumer consumer2 = createConsumer(topic, 0, 2, false, 2);
@@ -577,7 +509,8 @@ public class PersistentDispatcherFailoverConsumerTest {
@Test
public void testFewBlockedConsumerSamePriority() throws Exception{
- PersistentTopic topic = new PersistentTopic(successTopicName,
ledgerMock, brokerService);
+ PersistentTopic topic =
+ new PersistentTopic(successTopicName, ledgerMock,
testPulsarServiceFactory.getBrokerService());
PersistentDispatcherMultipleConsumers dispatcher = new
PersistentDispatcherMultipleConsumers(topic, cursorMock, null);
Consumer consumer1 = createConsumer(topic, 0, 2, false, 1);
Consumer consumer2 = createConsumer(topic, 0, 2, false, 2);
@@ -604,7 +537,8 @@ public class PersistentDispatcherFailoverConsumerTest {
@Test
public void testFewBlockedConsumerDifferentPriority() throws Exception {
- PersistentTopic topic = new PersistentTopic(successTopicName,
ledgerMock, brokerService);
+ PersistentTopic topic =
+ new PersistentTopic(successTopicName, ledgerMock,
testPulsarServiceFactory.getBrokerService());
PersistentDispatcherMultipleConsumers dispatcher = new
PersistentDispatcherMultipleConsumers(topic, cursorMock, null);
Consumer consumer1 = createConsumer(topic, 0, 2, false, 1);
Consumer consumer2 = createConsumer(topic, 0, 2, false, 2);
@@ -658,7 +592,8 @@ public class PersistentDispatcherFailoverConsumerTest {
@Test
public void testFewBlockedConsumerDifferentPriority2() throws Exception {
- PersistentTopic topic = new PersistentTopic(successTopicName,
ledgerMock, brokerService);
+ PersistentTopic topic =
+ new PersistentTopic(successTopicName, ledgerMock,
testPulsarServiceFactory.getBrokerService());
PersistentDispatcherMultipleConsumers dispatcher = new
PersistentDispatcherMultipleConsumers(topic, cursorMock, null);
Consumer consumer1 = createConsumer(topic, 0, 2, true, 1);
Consumer consumer2 = createConsumer(topic, 0, 2, true, 2);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 8e3c3c73054..ae9fb9a1bf7 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -18,9 +18,7 @@
*/
package org.apache.pulsar.broker.service;
-import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgsRecordingInvocations;
-import static
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper;
import static
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
@@ -48,14 +46,11 @@ import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.DefaultEventLoop;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URL;
-import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -76,7 +71,6 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import lombok.Cleanup;
-import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
@@ -90,19 +84,15 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
-import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.pulsar.broker.PulsarService;
-import org.apache.pulsar.broker.PulsarServiceMockSupport;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.TestPulsarService;
import org.apache.pulsar.broker.namespace.NamespaceService;
-import org.apache.pulsar.broker.resources.NamespaceResources;
-import org.apache.pulsar.broker.resources.PulsarResources;
-import org.apache.pulsar.broker.resources.TopicResources;
import org.apache.pulsar.broker.service.persistent.CompactorSubscription;
import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator;
import
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
@@ -110,8 +100,6 @@ import
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleAct
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
-import org.apache.pulsar.broker.service.schema.DefaultSchemaRegistryService;
-import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
@@ -128,7 +116,6 @@ import org.apache.pulsar.common.api.proto.KeySharedMode;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.ProducerAccessMode;
import org.apache.pulsar.common.naming.NamespaceBundle;
-import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.Policies;
@@ -138,9 +125,7 @@ import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
-import org.apache.pulsar.common.util.GracefulExecutorServicesShutdown;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
-import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.apache.pulsar.compaction.CompactedTopic;
import org.apache.pulsar.compaction.CompactedTopicContext;
import org.apache.pulsar.compaction.Compactor;
@@ -149,7 +134,6 @@ import
org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
import org.awaitility.Awaitility;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -160,10 +144,6 @@ import org.testng.annotations.Test;
@Test(groups = "broker")
public class PersistentTopicTest extends MockedBookKeeperTestCase {
- protected PulsarService pulsar;
- private BrokerService brokerService;
- private SchemaRegistryService schemaRegistryService;
- private ManagedLedgerFactory mlFactoryMock;
private ServerCnx serverCnx;
private ManagedLedger ledgerMock;
private ManagedCursor cursorMock;
@@ -175,14 +155,14 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
final String successSubName2 = "successSub2";
private static final Logger log =
LoggerFactory.getLogger(PersistentTopicTest.class);
- private OrderedExecutor executor;
- private EventLoopGroup eventLoopGroup;
+ protected TestPulsarService.Factory testPulsarServiceFactory;
+
+ private BrokerService brokerService;
+
@BeforeMethod(alwaysRun = true)
public void setup() throws Exception {
- eventLoopGroup = new NioEventLoopGroup();
- executor = OrderedExecutor.newBuilder().numThreads(1).build();
- ServiceConfiguration svcConfig = spy(ServiceConfiguration.class);
+ ServiceConfiguration svcConfig = new ServiceConfiguration();
svcConfig.setAdvertisedAddress("localhost");
svcConfig.setBrokerShutdownTimeoutMs(0L);
svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
@@ -190,47 +170,24 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
svcConfig.setClusterName("pulsar-cluster");
svcConfig.setTopicLevelPoliciesEnabled(false);
svcConfig.setSystemTopicEnabled(false);
- pulsar = spyWithClassAndConstructorArgs(PulsarService.class,
svcConfig);
- doReturn(svcConfig).when(pulsar).getConfiguration();
- doReturn(mock(Compactor.class)).when(pulsar).getCompactor();
-
- // create SchemaRegistryService for testDeleteTopic() otherwise it
will fail
- schemaRegistryService =
spyWithClassAndConstructorArgs(DefaultSchemaRegistryService.class);
-
doReturn(schemaRegistryService).when(pulsar).getSchemaRegistryService();
-
- mlFactoryMock = mock(ManagedLedgerFactory.class);
- doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory();
- doReturn(mock(PulsarClientImpl.class)).when(pulsar).getClient();
+ testPulsarServiceFactory = TestPulsarService.Factory.builder()
+ .config(svcConfig)
+ .useSpies(true)
+ .useTestPulsarResources(metadataStore)
+ .compactor(mock(Compactor.class))
+ .build();
+ brokerService = testPulsarServiceFactory.getBrokerService();
doAnswer(invocationOnMock -> CompletableFuture.completedFuture(null))
- .when(mlFactoryMock).getManagedLedgerPropertiesAsync(any());
+
.when(testPulsarServiceFactory.getManagedLedgerFactory()).getManagedLedgerPropertiesAsync(any());
doAnswer(invocation -> {
DeleteLedgerCallback deleteLedgerCallback =
invocation.getArgument(1);
deleteLedgerCallback.deleteLedgerComplete(null);
return null;
- }).when(mlFactoryMock).asyncDelete(any(), any(), any());
- // Mock metaStore.
- doReturn(createMockBookKeeper(executor))
- .when(pulsar).getBookKeeperClient();
- doReturn(executor).when(pulsar).getOrderedExecutor();
- doReturn(metadataStore).when(pulsar).getLocalMetadataStore();
- doReturn(metadataStore).when(pulsar).getConfigurationMetadataStore();
- // Mock pulsarResources.
- PulsarResources pulsarResources =
spyWithClassAndConstructorArgs(PulsarResources.class, metadataStore,
metadataStore);
- NamespaceResources nsr =
spyWithClassAndConstructorArgs(NamespaceResources.class, metadataStore, 30);
- TopicResources tsr =
spyWithClassAndConstructorArgs(TopicResources.class, metadataStore);
- doReturn(nsr).when(pulsarResources).getNamespaceResources();
- doReturn(tsr).when(pulsarResources).getTopicResources();
- PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
- doReturn(pulsarResources).when(pulsar).getPulsarResources();
- });
- // Mock brokerService.
- brokerService = spyWithClassAndConstructorArgs(BrokerService.class,
pulsar, eventLoopGroup);
- PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
- doReturn(brokerService).when(pulsar).getBrokerService();
- });
+
}).when(testPulsarServiceFactory.getManagedLedgerFactory()).asyncDelete(any(),
any(), any());
// Mock serviceCnx.
- serverCnx =
spyWithClassAndConstructorArgsRecordingInvocations(ServerCnx.class, pulsar);
+ serverCnx =
spyWithClassAndConstructorArgsRecordingInvocations(ServerCnx.class,
+ testPulsarServiceFactory.getPulsarService());
doReturn(true).when(serverCnx).isActive();
doReturn(true).when(serverCnx).isWritable();
doReturn(new InetSocketAddress("localhost",
1234)).when(serverCnx).clientAddress();
@@ -245,9 +202,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
NamespaceService nsSvc = mock(NamespaceService.class);
NamespaceBundle bundle = mock(NamespaceBundle.class);
doReturn(CompletableFuture.completedFuture(bundle)).when(nsSvc).getBundleAsync(any());
- PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
- doReturn(nsSvc).when(pulsar).getNamespaceService();
- });
+
doReturn(nsSvc).when(testPulsarServiceFactory.getPulsarService()).getNamespaceService();
doReturn(true).when(nsSvc).isServiceUnitOwned(any());
doReturn(true).when(nsSvc).isServiceUnitActive(any());
doReturn(CompletableFuture.completedFuture(true)).when(nsSvc).isServiceUnitActiveAsync(any());
@@ -258,13 +213,10 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
@AfterMethod(alwaysRun = true)
public void teardown() throws Exception {
- brokerService.close();
- pulsar.close();
- GracefulExecutorServicesShutdown.initiate()
- .timeout(Duration.ZERO)
- .shutdown(executor)
- .handle().get();
- EventLoopUtil.shutdownGracefully(eventLoopGroup).get();
+ if (testPulsarServiceFactory != null) {
+ testPulsarServiceFactory.close();
+ testPulsarServiceFactory = null;
+ }
}
@Test
@@ -273,18 +225,15 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
doReturn(new ArrayList<>()).when(ledgerMock).getCursors();
final String topicName = "persistent://prop/use/ns-abc/topic1";
- doAnswer(new Answer<Object>() {
- @Override
- public Object answer(InvocationOnMock invocationOnMock) {
- ((OpenLedgerCallback)
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
- return null;
- }
- }).when(mlFactoryMock).asyncOpen(anyString(),
any(ManagedLedgerConfig.class), any(OpenLedgerCallback.class),
- any(Supplier.class), any());
+ doAnswer(invocationOnMock -> {
+ ((OpenLedgerCallback)
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
+ return null;
+ }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+ .asyncOpen(anyString(), any(ManagedLedgerConfig.class),
any(OpenLedgerCallback.class),
+ any(Supplier.class), any());
- CompletableFuture<Void> future =
brokerService.getOrCreateTopic(topicName).thenAccept(topic -> {
- assertTrue(topic.toString().contains(topicName));
- }).exceptionally((t) -> {
+ CompletableFuture<Void> future =
brokerService.getOrCreateTopic(topicName).thenAccept(topic ->
+
assertTrue(topic.toString().contains(topicName))).exceptionally((t) -> {
fail("should not fail");
return null;
});
@@ -300,16 +249,14 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
@Test
public void testCreateTopicMLFailure() {
final String jinxedTopicName = "persistent://prop/use/ns-abc/topic3";
- doAnswer(new Answer<Object>() {
- @Override
- public Object answer(InvocationOnMock invocationOnMock) {
- new Thread(() -> ((OpenLedgerCallback)
invocationOnMock.getArguments()[2])
- .openLedgerFailed(new ManagedLedgerException("Managed
ledger failure"), null)).start();
+ doAnswer(invocationOnMock -> {
+ new Thread(() -> ((OpenLedgerCallback)
invocationOnMock.getArguments()[2])
+ .openLedgerFailed(new ManagedLedgerException("Managed
ledger failure"), null)).start();
- return null;
- }
- }).when(mlFactoryMock).asyncOpen(anyString(),
any(ManagedLedgerConfig.class), any(OpenLedgerCallback.class),
- any(Supplier.class), any());
+ return null;
+ }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+ .asyncOpen(anyString(), any(ManagedLedgerConfig.class),
any(OpenLedgerCallback.class),
+ any(Supplier.class), any());
CompletableFuture<Topic> future =
brokerService.getOrCreateTopic(jinxedTopicName);
@@ -367,7 +314,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
}
@Test
- public void testDispatcherMultiConsumerReadFailed() throws Exception {
+ public void testDispatcherMultiConsumerReadFailed() {
PersistentTopic topic =
spyWithClassAndConstructorArgsRecordingInvocations(PersistentTopic.class,
successTopicName, ledgerMock, brokerService);
ManagedCursor cursor = mock(ManagedCursor.class);
when(cursor.getName()).thenReturn("cursor");
@@ -379,7 +326,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
}
@Test
- public void testDispatcherSingleConsumerReadFailed() throws Exception {
+ public void testDispatcherSingleConsumerReadFailed() {
PersistentTopic topic =
spyWithClassAndConstructorArgsRecordingInvocations(PersistentTopic.class,
successTopicName, ledgerMock, brokerService);
ManagedCursor cursor = mock(ManagedCursor.class);
when(cursor.getName()).thenReturn("cursor");
@@ -395,26 +342,18 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
final String successTopicName =
"persistent://prop/use/ns-abc/successTopic";
final ManagedLedger ledgerMock = mock(ManagedLedger.class);
- doReturn(new ArrayList<Object>()).when(ledgerMock).getCursors();
+ doReturn(new ArrayList<>()).when(ledgerMock).getCursors();
PersistentTopic topic = new PersistentTopic(successTopicName,
ledgerMock, brokerService);
- MessageMetadata messageMetadata = new MessageMetadata()
- .setPublishTime(System.currentTimeMillis())
- .setProducerName("prod-name")
- .setSequenceId(1);
-
ByteBuf payload = Unpooled.wrappedBuffer("content".getBytes());
final CountDownLatch latch = new CountDownLatch(1);
// override asyncAddEntry callback to return error
- doAnswer(new Answer<Object>() {
- @Override
- public Object answer(InvocationOnMock invocationOnMock) throws
Throwable {
- ((AddEntryCallback)
invocationOnMock.getArguments()[1]).addFailed(
- new ManagedLedgerException("Managed ledger failure"),
invocationOnMock.getArguments()[2]);
- return null;
- }
+ doAnswer((Answer<Object>) invocationOnMock -> {
+ ((AddEntryCallback) invocationOnMock.getArguments()[1]).addFailed(
+ new ManagedLedgerException("Managed ledger failure"),
invocationOnMock.getArguments()[2]);
+ return null;
}).when(ledgerMock).asyncAddEntry(any(ByteBuf.class),
any(AddEntryCallback.class), any());
topic.publishMessage(payload, (exception, ledgerId, entryId) -> {
@@ -429,7 +368,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
}
@Test
- public void testAddRemoveProducer() throws Exception {
+ public void testAddRemoveProducer() {
PersistentTopic topic = new PersistentTopic(successTopicName,
ledgerMock, brokerService);
String role = "appid1";
@@ -479,7 +418,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
}
@Test
- public void testProducerOverwrite() throws Exception {
+ public void testProducerOverwrite() {
PersistentTopic topic = new PersistentTopic(successTopicName,
ledgerMock, brokerService);
String role = "appid1";
Producer producer1 = new Producer(topic, serverCnx, 1 /* producer id
*/, "prod-name",
@@ -548,7 +487,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
topic.getProducers().values().forEach(producer ->
Assert.assertEquals(producer.getEpoch(), 3));
}
- private void testMaxProducers() throws Exception {
+ private void testMaxProducers() {
PersistentTopic topic = new PersistentTopic(successTopicName,
ledgerMock, brokerService);
topic.initialize();
String role = "appid1";
@@ -576,40 +515,28 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
}
@Test
- public void testMaxProducersForBroker() throws Exception {
+ public void testMaxProducersForBroker() {
// set max clients
- ServiceConfiguration svcConfig = spy(ServiceConfiguration.class);
- doReturn(2).when(svcConfig).getMaxProducersPerTopic();
- doReturn(svcConfig).when(pulsar).getConfiguration();
+ testPulsarServiceFactory.getConfig().setMaxProducersPerTopic(2);
testMaxProducers();
}
@Test
public void testMaxProducersForNamespace() throws Exception {
- ServiceConfiguration svcConfig = spy(ServiceConfiguration.class);
- doReturn(svcConfig).when(pulsar).getConfiguration();
// set max clients
Policies policies = new Policies();
policies.max_producers_per_topic = 2;
- when(pulsar.getPulsarResources().getNamespaceResources()
-
.getPoliciesIfCached(TopicName.get(successTopicName).getNamespaceObject()))
- .thenReturn(Optional.of(policies));
-
- when(pulsar.getPulsarResources().getNamespaceResources()
-
.getPolicies(TopicName.get(successTopicName).getNamespaceObject()))
- .thenReturn(Optional.of(policies));
- when(pulsar.getPulsarResources().getNamespaceResources()
-
.getPoliciesAsync(TopicName.get(successTopicName).getNamespaceObject()))
-
.thenReturn(CompletableFuture.completedFuture(Optional.of(policies)));
+ testPulsarServiceFactory.getPulsarResources().getNamespaceResources()
+
.createPolicies(TopicName.get(successTopicName).getNamespaceObject(),
+ policies);
testMaxProducers();
}
- private Producer getMockedProducerWithSpecificAddress(Topic topic, long
producerId, InetAddress address)
- throws Exception {
+ private Producer getMockedProducerWithSpecificAddress(Topic topic, long
producerId, InetAddress address) {
final String producerNameBase = "producer";
final String role = "appid1";
- ServerCnx cnx = spyWithClassAndConstructorArgs(ServerCnx.class,
pulsar);
+ ServerCnx cnx = testPulsarServiceFactory.createServerCnxSpy();
doReturn(true).when(cnx).isActive();
doReturn(true).when(cnx).isWritable();
doReturn(new InetSocketAddress(address,
1234)).when(cnx).clientAddress();
@@ -623,9 +550,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
@Test
public void testMaxSameAddressProducers() throws Exception {
// set max clients
- ServiceConfiguration svcConfig = spy(ServiceConfiguration.class);
- doReturn(2).when(svcConfig).getMaxSameAddressProducersPerTopic();
- doReturn(svcConfig).when(pulsar).getConfiguration();
+
testPulsarServiceFactory.getConfig().setMaxSameAddressProducersPerTopic(2);
PersistentTopic topic = new PersistentTopic(successTopicName,
ledgerMock, brokerService);
@@ -719,14 +644,13 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
}
private SubscriptionOption getSubscriptionOption(CommandSubscribe cmd) {
- SubscriptionOption subscriptionOption =
SubscriptionOption.builder().cnx(serverCnx)
+ return SubscriptionOption.builder().cnx(serverCnx)
.subscriptionName(cmd.getSubscription()).consumerId(cmd.getConsumerId()).subType(cmd.getSubType())
.priorityLevel(0).consumerName(cmd.getConsumerName()).isDurable(cmd.isDurable()).startMessageId(null)
.metadata(Collections.emptyMap()).readCompacted(false)
.initialPosition(InitialPosition.Latest).subscriptionProperties(Optional.empty())
.startMessageRollbackDurationSec(0).replicatedSubscriptionStateArg(false).keySharedMeta(null)
.build();
- return subscriptionOption;
}
@Test
@@ -929,33 +853,21 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
@Test
public void testMaxConsumersSharedForBroker() throws Exception {
// set max clients
- ServiceConfiguration svcConfig = spy(ServiceConfiguration.class);
- doReturn(2).when(svcConfig).getMaxConsumersPerSubscription();
- doReturn(3).when(svcConfig).getMaxConsumersPerTopic();
- doReturn(svcConfig).when(pulsar).getConfiguration();
-
+ testPulsarServiceFactory.getConfig().setMaxConsumersPerSubscription(2);
+ testPulsarServiceFactory.getConfig().setMaxConsumersPerTopic(3);
testMaxConsumersShared();
}
@Test
public void testMaxConsumersSharedForNamespace() throws Exception {
- ServiceConfiguration svcConfig = spy(ServiceConfiguration.class);
- doReturn(svcConfig).when(pulsar).getConfiguration();
-
// set max clients
Policies policies = new Policies();
policies.max_consumers_per_subscription = 2;
policies.max_consumers_per_topic = 3;
- when(pulsar.getPulsarResources().getNamespaceResources()
-
.getPolicies(TopicName.get(successTopicName).getNamespaceObject()))
- .thenReturn(Optional.of(policies));
- when(pulsar.getPulsarResources().getNamespaceResources()
-
.getPoliciesIfCached(TopicName.get(successTopicName).getNamespaceObject()))
- .thenReturn(Optional.of(policies));
- when(pulsar.getPulsarResources().getNamespaceResources()
-
.getPoliciesAsync(TopicName.get(successTopicName).getNamespaceObject()))
-
.thenReturn(CompletableFuture.completedFuture(Optional.of(policies)));
+ testPulsarServiceFactory.getPulsarResources().getNamespaceResources()
+
.createPolicies(TopicName.get(successTopicName).getNamespaceObject(),
+ policies);
testMaxConsumersShared();
}
@@ -1037,42 +949,32 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
@Test
public void testMaxConsumersFailoverForBroker() throws Exception {
// set max clients
- ServiceConfiguration svcConfig = spy(ServiceConfiguration.class);
- doReturn(2).when(svcConfig).getMaxConsumersPerSubscription();
- doReturn(3).when(svcConfig).getMaxConsumersPerTopic();
- doReturn(svcConfig).when(pulsar).getConfiguration();
+ testPulsarServiceFactory.getConfig().setMaxConsumersPerSubscription(2);
+ testPulsarServiceFactory.getConfig().setMaxConsumersPerTopic(3);
testMaxConsumersFailover();
}
@Test
public void testMaxConsumersFailoverForNamespace() throws Exception {
- ServiceConfiguration svcConfig = spy(ServiceConfiguration.class);
- doReturn(svcConfig).when(pulsar).getConfiguration();
-
// set max clients
Policies policies = new Policies();
policies.max_consumers_per_subscription = 2;
policies.max_consumers_per_topic = 3;
- when(pulsar.getPulsarResources().getNamespaceResources()
-
.getPolicies(TopicName.get(successTopicName).getNamespaceObject()))
- .thenReturn(Optional.of(policies));
- when(pulsar.getPulsarResources().getNamespaceResources()
-
.getPoliciesIfCached(TopicName.get(successTopicName).getNamespaceObject()))
- .thenReturn(Optional.of(policies));
- when(pulsar.getPulsarResources().getNamespaceResources()
-
.getPoliciesAsync(TopicName.get(successTopicName).getNamespaceObject()))
-
.thenReturn(CompletableFuture.completedFuture(Optional.of(policies)));
+ testPulsarServiceFactory.getPulsarResources().getNamespaceResources()
+
.createPolicies(TopicName.get(successTopicName).getNamespaceObject(),
+ policies);
+
testMaxConsumersFailover();
}
private Consumer getMockedConsumerWithSpecificAddress(Topic topic,
Subscription sub, long consumerId,
- InetAddress address)
throws Exception {
+ InetAddress address)
{
final String consumerNameBase = "consumer";
final String role = "appid1";
- ServerCnx cnx = spyWithClassAndConstructorArgs(ServerCnx.class,
pulsar);
+ ServerCnx cnx = testPulsarServiceFactory.createServerCnxSpy();
doReturn(true).when(cnx).isActive();
doReturn(true).when(cnx).isWritable();
doReturn(new InetSocketAddress(address,
1234)).when(cnx).clientAddress();
@@ -1086,9 +988,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
@Test
public void testMaxSameAddressConsumers() throws Exception {
// set max clients
- ServiceConfiguration svcConfig = spy(ServiceConfiguration.class);
- doReturn(2).when(svcConfig).getMaxSameAddressConsumersPerTopic();
- doReturn(svcConfig).when(pulsar).getConfiguration();
+
testPulsarServiceFactory.getConfig().setMaxSameAddressConsumersPerTopic(2);
PersistentTopic topic = new PersistentTopic(successTopicName,
ledgerMock, brokerService);
PersistentSubscription sub1 = new PersistentSubscription(topic,
"sub1", cursorMock, false);
@@ -1195,13 +1095,10 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
true, serverCnx, "myrole-1", Collections.emptyMap(), false /*
read compacted */, null, MessageId.latest, DEFAULT_CONSUMER_EPOCH);
sub.addConsumer(consumer1);
- doAnswer(new Answer<Object>() {
- @Override
- public Object answer(InvocationOnMock invocationOnMock) throws
Throwable {
- ((DeleteCursorCallback)
invocationOnMock.getArguments()[1]).deleteCursorComplete(null);
- Thread.sleep(1000);
- return null;
- }
+ doAnswer(invocationOnMock -> {
+ ((DeleteCursorCallback)
invocationOnMock.getArguments()[1]).deleteCursorComplete(null);
+ Thread.sleep(1000);
+ return null;
}).when(ledgerMock).asyncDeleteCursor(matches(".*success.*"),
any(DeleteCursorCallback.class), any());
@Cleanup("shutdownNow")
@@ -1294,7 +1191,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
topic.addProducer(producer, new CompletableFuture<>()).join();
CompletableFuture<Void> cf = topic.delete();
- Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() ->
cf.isDone());
+ Awaitility.await().atMost(30, TimeUnit.SECONDS).until(cf::isDone);
assertTrue(cf.isCompletedExceptionally());
assertFalse((boolean) isFencedField.get(topic));
assertFalse((boolean) isClosingOrDeletingField.get(topic));
@@ -1313,7 +1210,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
f1.get();
CompletableFuture<Void> cf2 = topic.delete();
- Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() ->
cf2.isDone());
+ Awaitility.await().atMost(30, TimeUnit.SECONDS).until(cf2::isDone);
assertTrue(cf2.isCompletedExceptionally());
assertFalse((boolean) isFencedField.get(topic));
assertFalse((boolean) isClosingOrDeletingField.get(topic));
@@ -1340,37 +1237,31 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
final CountDownLatch counter = new CountDownLatch(2);
final AtomicBoolean gotException = new AtomicBoolean(false);
- Thread deleter = new Thread() {
- @Override
- public void run() {
- try {
- barrier.await();
- assertFalse(topic.delete().isCompletedExceptionally());
- } catch (Exception e) {
- e.printStackTrace();
- gotException.set(true);
- } finally {
- counter.countDown();
- }
+ Thread deleter = new Thread(() -> {
+ try {
+ barrier.await();
+ assertFalse(topic.delete().isCompletedExceptionally());
+ } catch (Exception e) {
+ e.printStackTrace();
+ gotException.set(true);
+ } finally {
+ counter.countDown();
}
- };
+ });
- Thread unsubscriber = new Thread() {
- @Override
- public void run() {
- try {
- barrier.await();
-
- // do topic unsubscribe
- topic.unsubscribe(successSubName);
- } catch (Exception e) {
- e.printStackTrace();
- gotException.set(true);
- } finally {
- counter.countDown();
- }
+ Thread unsubscriber = new Thread(() -> {
+ try {
+ barrier.await();
+
+ // do topic unsubscribe
+ topic.unsubscribe(successSubName);
+ } catch (Exception e) {
+ e.printStackTrace();
+ gotException.set(true);
+ } finally {
+ counter.countDown();
}
- };
+ });
deleter.start();
unsubscriber.start();
@@ -1398,42 +1289,36 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
final CountDownLatch counter = new CountDownLatch(2);
final AtomicBoolean gotException = new AtomicBoolean(false);
- Thread deleter = new Thread() {
- @Override
- public void run() {
- try {
- barrier.await();
- // assertTrue(topic.unsubscribe(successSubName).isDone());
- Thread.sleep(5, 0);
- log.info("deleter outcome is {}", topic.delete().get());
- } catch (Exception e) {
- e.printStackTrace();
- gotException.set(true);
- } finally {
- counter.countDown();
- }
+ Thread deleter = new Thread(() -> {
+ try {
+ barrier.await();
+ // assertTrue(topic.unsubscribe(successSubName).isDone());
+ Thread.sleep(5, 0);
+ log.info("deleter outcome is {}", topic.delete().get());
+ } catch (Exception e) {
+ e.printStackTrace();
+ gotException.set(true);
+ } finally {
+ counter.countDown();
}
- };
+ });
- Thread unsubscriber = new Thread() {
- @Override
- public void run() {
- try {
- barrier.await();
- // do subscription delete
- ConcurrentOpenHashMap<String, PersistentSubscription>
subscriptions = topic.getSubscriptions();
- PersistentSubscription ps =
subscriptions.get(successSubName);
- // Thread.sleep(5,0);
- log.info("unsubscriber outcome is {}",
ps.doUnsubscribe(ps.getConsumers().get(0)).get());
- // assertFalse(ps.delete().isCompletedExceptionally());
- } catch (Exception e) {
- e.printStackTrace();
- gotException.set(true);
- } finally {
- counter.countDown();
- }
+ Thread unsubscriber = new Thread(() -> {
+ try {
+ barrier.await();
+ // do subscription delete
+ ConcurrentOpenHashMap<String, PersistentSubscription>
subscriptions = topic.getSubscriptions();
+ PersistentSubscription ps = subscriptions.get(successSubName);
+ // Thread.sleep(5,0);
+ log.info("unsubscriber outcome is {}",
ps.doUnsubscribe(ps.getConsumers().get(0)).get());
+ // assertFalse(ps.delete().isCompletedExceptionally());
+ } catch (Exception e) {
+ e.printStackTrace();
+ gotException.set(true);
+ } finally {
+ counter.countDown();
}
- };
+ });
deleter.start();
unsubscriber.start();
@@ -1447,13 +1332,10 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
PersistentTopic topic = (PersistentTopic)
brokerService.getOrCreateTopic(successTopicName).get();
// override ledger deletion callback to slow down deletion
- doAnswer(new Answer<Object>() {
- @Override
- public Object answer(InvocationOnMock invocationOnMock) throws
Throwable {
- Thread.sleep(1000);
- ((DeleteLedgerCallback)
invocationOnMock.getArguments()[0]).deleteLedgerComplete(null);
- return null;
- }
+ doAnswer(invocationOnMock -> {
+ Thread.sleep(1000);
+ ((DeleteLedgerCallback)
invocationOnMock.getArguments()[0]).deleteLedgerComplete(null);
+ return null;
}).when(ledgerMock).asyncDelete(any(DeleteLedgerCallback.class),
any());
@Cleanup("shutdownNow")
@@ -1500,16 +1382,13 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
cursorMock = mock(ManagedCursorImpl.class);
final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
- doReturn(new ArrayList<Object>()).when(ledgerMock).getCursors();
+ doReturn(new ArrayList<>()).when(ledgerMock).getCursors();
doReturn("mockCursor").when(cursorMock).getName();
doReturn(true).when(cursorMock).isDurable();
// doNothing().when(cursorMock).asyncClose(new CloseCallback() {
- doAnswer(new Answer<Object>() {
- @Override
- public Object answer(InvocationOnMock invocationOnMock) throws
Throwable {
- // return closeFuture.get();
- return closeFuture.complete(null);
- }
+ doAnswer((Answer<Object>) invocationOnMock -> {
+ // return closeFuture.get();
+ return closeFuture.complete(null);
})
.when(cursorMock).asyncClose(new CloseCallback() {
@@ -1530,78 +1409,56 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
}, null);
// call openLedgerComplete with ledgerMock on ML factory asyncOpen
- doAnswer(new Answer<Object>() {
- @Override
- public Object answer(InvocationOnMock invocationOnMock) throws
Throwable {
- ((OpenLedgerCallback)
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
- return null;
- }
- }).when(mlFactoryMock).asyncOpen(matches(".*success.*"),
any(ManagedLedgerConfig.class),
- any(OpenLedgerCallback.class), any(Supplier.class), any());
+ doAnswer(invocationOnMock -> {
+ ((OpenLedgerCallback)
invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
+ return null;
+ }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+ .asyncOpen(matches(".*success.*"),
any(ManagedLedgerConfig.class),
+ any(OpenLedgerCallback.class), any(Supplier.class),
any());
// call openLedgerFailed on ML factory asyncOpen
- doAnswer(new Answer<Object>() {
- @Override
- public Object answer(InvocationOnMock invocationOnMock) throws
Throwable {
- ((OpenLedgerCallback) invocationOnMock.getArguments()[2])
- .openLedgerFailed(new ManagedLedgerException("Managed
ledger failure"), null);
- return null;
- }
- }).when(mlFactoryMock).asyncOpen(matches(".*fail.*"),
any(ManagedLedgerConfig.class),
- any(OpenLedgerCallback.class), any(Supplier.class), any());
+ doAnswer(invocationOnMock -> {
+ ((OpenLedgerCallback) invocationOnMock.getArguments()[2])
+ .openLedgerFailed(new ManagedLedgerException("Managed
ledger failure"), null);
+ return null;
+ }).when(testPulsarServiceFactory.getManagedLedgerFactory())
+ .asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class),
+ any(OpenLedgerCallback.class), any(Supplier.class),
any());
// call addComplete on ledger asyncAddEntry
- doAnswer(new Answer<Object>() {
- @Override
- public Object answer(InvocationOnMock invocationOnMock) throws
Throwable {
- ((AddEntryCallback)
invocationOnMock.getArguments()[1]).addComplete(new PositionImpl(1, 1),
- null,
- invocationOnMock.getArguments()[2]);
- return null;
- }
+ doAnswer(invocationOnMock -> {
+ ((AddEntryCallback)
invocationOnMock.getArguments()[1]).addComplete(new PositionImpl(1, 1),
+ null,
+ invocationOnMock.getArguments()[2]);
+ return null;
}).when(ledgerMock).asyncAddEntry(any(ByteBuf.class),
any(AddEntryCallback.class), any());
// call openCursorComplete on cursor asyncOpen
- doAnswer(new Answer<Object>() {
- @Override
- public Object answer(InvocationOnMock invocationOnMock) throws
Throwable {
- ((OpenCursorCallback)
invocationOnMock.getArguments()[2]).openCursorComplete(cursorMock, null);
- return null;
- }
+ doAnswer(invocationOnMock -> {
+ ((OpenCursorCallback)
invocationOnMock.getArguments()[2]).openCursorComplete(cursorMock, null);
+ return null;
}).when(ledgerMock).asyncOpenCursor(matches(".*success.*"),
any(InitialPosition.class), any(OpenCursorCallback.class), any());
- doAnswer(new Answer<Object>() {
- @Override
- public Object answer(InvocationOnMock invocationOnMock) throws
Throwable {
- ((OpenCursorCallback)
invocationOnMock.getArguments()[4]).openCursorComplete(cursorMock, null);
- return null;
- }
+ doAnswer(invocationOnMock -> {
+ ((OpenCursorCallback)
invocationOnMock.getArguments()[4]).openCursorComplete(cursorMock, null);
+ return null;
}).when(ledgerMock).asyncOpenCursor(matches(".*success.*"),
any(InitialPosition.class), any(Map.class),
any(Map.class), any(OpenCursorCallback.class), any());
- doAnswer(new Answer<Object>() {
- @Override
- public Object answer(InvocationOnMock invocationOnMock) throws
Throwable {
- ((CloseCallback)
invocationOnMock.getArguments()[0]).closeComplete(null);
- return null;
- }
+ doAnswer(invocationOnMock -> {
+ ((CloseCallback)
invocationOnMock.getArguments()[0]).closeComplete(null);
+ return null;
}).when(ledgerMock).asyncClose(any(CloseCallback.class), any());
// call deleteLedgerComplete on ledger asyncDelete
- doAnswer(new Answer<Object>() {
- @Override
- public Object answer(InvocationOnMock invocationOnMock) throws
Throwable {
- ((DeleteLedgerCallback)
invocationOnMock.getArguments()[0]).deleteLedgerComplete(null);
- return null;
- }
+ doAnswer(invocationOnMock -> {
+ ((DeleteLedgerCallback)
invocationOnMock.getArguments()[0]).deleteLedgerComplete(null);
+ return null;
}).when(ledgerMock).asyncDelete(any(DeleteLedgerCallback.class),
any());
- doAnswer(new Answer<Object>() {
- @Override
- public Object answer(InvocationOnMock invocationOnMock) throws
Throwable {
- ((DeleteCursorCallback)
invocationOnMock.getArguments()[1]).deleteCursorComplete(null);
- return null;
- }
+ doAnswer(invocationOnMock -> {
+ ((DeleteCursorCallback)
invocationOnMock.getArguments()[1]).deleteCursorComplete(null);
+ return null;
}).when(ledgerMock).asyncDeleteCursor(matches(".*success.*"),
any(DeleteCursorCallback.class), any());
doAnswer((invokactionOnMock) -> {
@@ -1630,13 +1487,9 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
assertFalse((boolean) isFencedField.get(topic));
assertFalse((boolean) isClosingOrDeletingField.get(topic));
- metadataStore.failConditional(new MetadataStoreException("injected
error"), (op, path) -> {
- if (op == FaultInjectionMetadataStore.OperationType.PUT
- &&
path.equals("/admin/partitioned-topics/prop/use/ns-abc/persistent/successTopic"))
{
- return true;
- }
- return false;
- });
+ metadataStore.failConditional(new MetadataStoreException("injected
error"), (op, path) ->
+ op == FaultInjectionMetadataStore.OperationType.PUT &&
+
path.equals("/admin/partitioned-topics/prop/use/ns-abc/persistent/successTopic"));
try {
topic.delete().get();
fail();
@@ -1830,8 +1683,10 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
String remoteReplicatorName = topic.getReplicatorPrefix() + "." +
remoteCluster;
ConcurrentOpenHashMap<String, Replicator> replicatorMap =
topic.getReplicators();
+ PulsarService pulsar = testPulsarServiceFactory.getPulsarService();
final URL brokerUrl = new URL(
- "http://" + pulsar.getAdvertisedAddress() + ":" +
pulsar.getConfiguration().getBrokerServicePort().get());
+ "http://" + pulsar.getAdvertisedAddress() + ":" +
pulsar.getConfiguration().getBrokerServicePort()
+ .get());
@Cleanup
PulsarClient client =
PulsarClient.builder().serviceUrl(brokerUrl.toString()).build();
ManagedCursor cursor = mock(ManagedCursorImpl.class);
@@ -1871,12 +1726,14 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
String remoteCluster = "remote";
final ManagedLedger ledgerMock = mock(ManagedLedger.class);
doNothing().when(ledgerMock).asyncDeleteCursor(any(), any(), any());
- doReturn(new ArrayList<Object>()).when(ledgerMock).getCursors();
+ doReturn(new ArrayList<>()).when(ledgerMock).getCursors();
PersistentTopic topic = new PersistentTopic(globalTopicName,
ledgerMock, brokerService);
+ PulsarService pulsar = testPulsarServiceFactory.getPulsarService();
final URL brokerUrl = new URL(
- "http://" + pulsar.getAdvertisedAddress() + ":" +
pulsar.getConfiguration().getBrokerServicePort().get());
+ "http://" + pulsar.getAdvertisedAddress() + ":" +
pulsar.getConfiguration().getBrokerServicePort()
+ .get());
@Cleanup
PulsarClient client =
spy(PulsarClient.builder().serviceUrl(brokerUrl.toString()).build());
PulsarClientImpl clientImpl = (PulsarClientImpl) client;
@@ -1907,7 +1764,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
}
@Test
- public void testCompactorSubscription() throws Exception {
+ public void testCompactorSubscription() {
PersistentTopic topic = new PersistentTopic(successTopicName,
ledgerMock, brokerService);
CompactedTopic compactedTopic = mock(CompactedTopic.class);
when(compactedTopic.newCompactedLedger(any(Position.class), anyLong()))
@@ -1924,7 +1781,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
@Test
- public void testCompactorSubscriptionUpdatedOnInit() throws Exception {
+ public void testCompactorSubscriptionUpdatedOnInit() {
long ledgerId = 0xc0bfefeL;
Map<String, Long> properties =
Map.of(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY, ledgerId);
PositionImpl position = new PositionImpl(1, 1);
@@ -1943,15 +1800,15 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
@Test
public void testCompactionTriggeredAfterThresholdFirstInvocation() throws
Exception {
CompletableFuture<Long> compactPromise = new CompletableFuture<>();
- Compactor compactor = pulsar.getCompactor();
+ Compactor compactor =
testPulsarServiceFactory.getPulsarService().getCompactor();
doReturn(compactPromise).when(compactor).compact(anyString());
Policies policies = new Policies();
policies.compaction_threshold = 1L;
- NamespaceResources nsr =
pulsar.getPulsarResources().getNamespaceResources();
- NamespaceName ns =
TopicName.get(successTopicName).getNamespaceObject();
-
doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(nsr).getPoliciesAsync(ns);
+ testPulsarServiceFactory.getPulsarResources().getNamespaceResources()
+
.createPolicies(TopicName.get(successTopicName).getNamespaceObject(),
+ policies);
PersistentTopic topic = new PersistentTopic(successTopicName,
ledgerMock, brokerService);
topic.initialize().get();
@@ -1974,7 +1831,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
@Test
public void testCompactionTriggeredAfterThresholdSecondInvocation() throws
Exception {
CompletableFuture<Long> compactPromise = new CompletableFuture<>();
- Compactor compactor = pulsar.getCompactor();
+ Compactor compactor =
testPulsarServiceFactory.getPulsarService().getCompactor();
doReturn(compactPromise).when(compactor).compact(anyString());
ManagedCursor subCursor = mock(ManagedCursor.class);
@@ -1984,9 +1841,9 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
Policies policies = new Policies();
policies.compaction_threshold = 1L;
- NamespaceResources nsr =
pulsar.getPulsarResources().getNamespaceResources();
- NamespaceName ns =
TopicName.get(successTopicName).getNamespaceObject();
-
doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(nsr).getPoliciesAsync(ns);
+ testPulsarServiceFactory.getPulsarResources().getNamespaceResources()
+
.createPolicies(TopicName.get(successTopicName).getNamespaceObject(),
+ policies);
PersistentTopic topic = new PersistentTopic(successTopicName,
ledgerMock, brokerService);
topic.initialize().get();
@@ -2008,15 +1865,15 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
@Test
public void testCompactionDisabledWithZeroThreshold() throws Exception {
CompletableFuture<Long> compactPromise = new CompletableFuture<>();
- Compactor compactor = pulsar.getCompactor();
+ Compactor compactor =
testPulsarServiceFactory.getPulsarService().getCompactor();
doReturn(compactPromise).when(compactor).compact(anyString());
Policies policies = new Policies();
policies.compaction_threshold = 0L;
- NamespaceResources nsr =
pulsar.getPulsarResources().getNamespaceResources();
- NamespaceName ns =
TopicName.get(successTopicName).getNamespaceObject();
-
doReturn(CompletableFuture.completedFuture(Optional.of(policies))).when(nsr).getPoliciesAsync(ns);
+ testPulsarServiceFactory.getPulsarResources().getNamespaceResources()
+
.createPolicies(TopicName.get(successTopicName).getNamespaceObject(),
+ policies);
doReturn(1000L).when(ledgerMock).getEstimatedBacklogSize();
@@ -2029,7 +1886,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
@Test
public void testBacklogCursor() throws Exception {
int backloggedThreshold = 10;
-
pulsar.getConfiguration().setManagedLedgerCursorBackloggedThreshold(backloggedThreshold);
+
testPulsarServiceFactory.getConfig().setManagedLedgerCursorBackloggedThreshold(backloggedThreshold);
ManagedLedgerImpl ledger = (ManagedLedgerImpl)
factory.open("cache_backlog_ledger");
PersistentTopic topic = new PersistentTopic(successTopicName, ledger,
brokerService);
@@ -2170,13 +2027,11 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
true, serverCnx, "app1", Collections.emptyMap(), false, null,
MessageId.latest, DEFAULT_CONSUMER_EPOCH);
addConsumerToSubscription.invoke(topic, nonDeletableSubscription1,
consumer);
- NamespaceResources nsr =
pulsar.getPulsarResources().getNamespaceResources();
- NamespaceName ns =
TopicName.get(successTopicName).getNamespaceObject();
- doReturn(Optional.of(new Policies())).when(nsr).getPolicies(ns);
+ testPulsarServiceFactory.getPulsarResources().getNamespaceResources()
+
.createPolicies(TopicName.get(successTopicName).getNamespaceObject(),
+ new Policies());
- ServiceConfiguration svcConfig = spy(ServiceConfiguration.class);
- doReturn(5).when(svcConfig).getSubscriptionExpirationTimeMinutes();
- doReturn(svcConfig).when(pulsar).getConfiguration();
+
testPulsarServiceFactory.getConfig().setSubscriptionExpirationTimeMinutes(5);
doReturn(System.currentTimeMillis() -
TimeUnit.MINUTES.toMillis(6)).when(cursorMock).getLastActive();
@@ -2189,8 +2044,6 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
@Test
public void testTopicFencingTimeout() throws Exception {
- ServiceConfiguration svcConfig = spy(ServiceConfiguration.class);
- doReturn(svcConfig).when(pulsar).getConfiguration();
PersistentTopic topic = new PersistentTopic(successTopicName,
ledgerMock, brokerService);
Method fence = PersistentTopic.class.getDeclaredMethod("fence");
@@ -2205,7 +2058,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
Field isClosingOrDeletingField =
PersistentTopic.class.getDeclaredField("isClosingOrDeleting");
isClosingOrDeletingField.setAccessible(true);
- doReturn(10).when(svcConfig).getTopicFencingTimeoutSeconds();
+ testPulsarServiceFactory.getConfig().setTopicFencingTimeoutSeconds(10);
fence.invoke(topic);
unfence.invoke(topic);
ScheduledFuture<?> fencedTopicMonitoringTask = (ScheduledFuture<?>)
fencedTopicMonitoringTaskField.get(topic);
@@ -2214,7 +2067,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
assertFalse((boolean) isFencedField.get(topic));
assertFalse((boolean) isClosingOrDeletingField.get(topic));
- doReturn(1).when(svcConfig).getTopicFencingTimeoutSeconds();
+ testPulsarServiceFactory.getConfig().setTopicFencingTimeoutSeconds(1);
fence.invoke(topic);
Thread.sleep(2000);
fencedTopicMonitoringTask = (ScheduledFuture<?>)
fencedTopicMonitoringTaskField.get(topic);
@@ -2226,7 +2079,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
@Test
public void testTopicCloseFencingTimeout() throws Exception {
- pulsar.getConfiguration().setTopicFencingTimeoutSeconds(10);
+ testPulsarServiceFactory.getConfig().setTopicFencingTimeoutSeconds(10);
Method fence = PersistentTopic.class.getDeclaredMethod("fence");
fence.setAccessible(true);
Field fencedTopicMonitoringTaskField =
PersistentTopic.class.getDeclaredField("fencedTopicMonitoringTask");
@@ -2289,7 +2142,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
}
@Test
- public void testDisconnectProducer() throws Exception {
+ public void testDisconnectProducer() {
PersistentTopic topic = new PersistentTopic(successTopicName,
ledgerMock, brokerService);
String role = "appid1";
Producer producer = new Producer(topic, serverCnx, 1 /* producer id
*/, "prod-name",
@@ -2300,7 +2153,7 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
producer.disconnect();
producer.disconnect();
verify(serverCnx).execute(any());
- };
+ }
@Test
public void testKeySharedMetadataExposedToStats() throws Exception {
@@ -2360,32 +2213,24 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
}
@Test
- public void testGetReplicationClusters() throws Exception {
+ public void testGetReplicationClusters() throws MetadataStoreException {
PersistentTopic topic = new PersistentTopic(successTopicName,
ledgerMock, brokerService);
topic.initialize();
assertEquals(topic.getHierarchyTopicPolicies().getReplicationClusters().get(),
Collections.emptyList());
- PulsarResources pulsarResources =
spyWithClassAndConstructorArgs(PulsarResources.class, metadataStore,
metadataStore);
- NamespaceResources nsr =
spyWithClassAndConstructorArgs(NamespaceResources.class, metadataStore, 30);
- doReturn(nsr).when(pulsarResources).getNamespaceResources();
- PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
- doReturn(pulsarResources).when(pulsar).getPulsarResources();
- });
- CompletableFuture<Optional<Policies>> policiesFuture = new
CompletableFuture<>();
Policies policies = new Policies();
Set<String> namespaceClusters = new HashSet<>();
namespaceClusters.add("namespace-cluster");
policies.replication_clusters = namespaceClusters;
- Optional<Policies> optionalPolicies = Optional.of(policies);
- policiesFuture.complete(optionalPolicies);
- doReturn(policiesFuture).when(nsr).getPoliciesAsync(any());
+ testPulsarServiceFactory.getPulsarResources().getNamespaceResources()
+
.createPolicies(TopicName.get(successTopicName).getNamespaceObject(), policies);
topic = new PersistentTopic(successTopicName, ledgerMock,
brokerService);
topic.initialize();
assertEquals(topic.getHierarchyTopicPolicies().getReplicationClusters().get(),
namespaceClusters);
TopicPoliciesService topicPoliciesService =
mock(TopicPoliciesService.class);
- doReturn(topicPoliciesService).when(pulsar).getTopicPoliciesService();
+
doReturn(topicPoliciesService).when(testPulsarServiceFactory.getPulsarService()).getTopicPoliciesService();
CompletableFuture<Optional<TopicPolicies>> topicPoliciesFuture = new
CompletableFuture<>();
TopicPolicies topicPolicies = new TopicPolicies();
List<String> topicClusters = new ArrayList<>();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxAuthorizationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxAuthorizationTest.java
index a2fc6d8130f..325cb8d84e4 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxAuthorizationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxAuthorizationTest.java
@@ -16,16 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.pulsar.broker.service;
import static
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgsRecordingInvocations;
-import static
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper;
-import static
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
@@ -38,8 +36,6 @@ import io.jsonwebtoken.SignatureAlgorithm;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.charset.StandardCharsets;
@@ -47,24 +43,15 @@ import java.util.Base64;
import java.util.Collections;
import java.util.Optional;
import java.util.Properties;
-import java.util.concurrent.CompletableFuture;
import javax.crypto.SecretKey;
-import org.apache.bookkeeper.common.util.OrderedExecutor;
-import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
-import org.apache.pulsar.broker.PulsarService;
-import org.apache.pulsar.broker.PulsarServiceMockSupport;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.TestPulsarService;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription;
import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
-import org.apache.pulsar.broker.intercept.BrokerInterceptor;
-import org.apache.pulsar.broker.resources.NamespaceResources;
-import org.apache.pulsar.broker.resources.PulsarResources;
-import org.apache.pulsar.broker.resources.TenantResources;
-import org.apache.pulsar.broker.service.schema.DefaultSchemaRegistryService;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.common.api.proto.CommandConnect;
@@ -74,10 +61,8 @@ import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TopicOperation;
-import org.apache.pulsar.metadata.api.MetadataStore;
-import org.apache.pulsar.metadata.impl.ZKMetadataStore;
-import org.apache.zookeeper.ZooKeeper;
import org.mockito.ArgumentMatcher;
+import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -89,15 +74,14 @@ public class ServerCnxAuthorizationTest {
private final String CLIENT_TOKEN =
Jwts.builder().setSubject(CLIENT_PRINCIPAL).signWith(SECRET_KEY).compact();
private final String PROXY_TOKEN =
Jwts.builder().setSubject(PROXY_PRINCIPAL).signWith(SECRET_KEY).compact();
- private PulsarService pulsar;
- private PulsarResources pulsarResources;
- private BrokerService brokerService;
private ServiceConfiguration svcConfig;
+ protected TestPulsarService.Factory testPulsarServiceFactory;
+ private BrokerService brokerService;
+
@BeforeMethod(alwaysRun = true)
public void beforeMethod() throws Exception {
- EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
- svcConfig = spy(ServiceConfiguration.class);
+ svcConfig = new ServiceConfiguration();
svcConfig.setKeepAliveIntervalSeconds(0);
svcConfig.setBrokerShutdownTimeoutMs(0L);
svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
@@ -111,57 +95,30 @@ public class ServerCnxAuthorizationTest {
properties.setProperty("tokenSecretKey", "data:;base64,"
+ Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded()));
svcConfig.setProperties(properties);
+ testPulsarServiceFactory = TestPulsarService.Factory.builder()
+ .config(svcConfig)
+ .useSpies(true)
+ .build();
+ brokerService = testPulsarServiceFactory.getBrokerService();
+
+
testPulsarServiceFactory.getPulsarResources().getTenantResources().createTenant("public",
+ TenantInfo.builder().build());
+ }
- pulsar =
spyWithClassAndConstructorArgsRecordingInvocations(PulsarService.class,
svcConfig);
- doReturn(new
DefaultSchemaRegistryService()).when(pulsar).getSchemaRegistryService();
-
- doReturn(svcConfig).when(pulsar).getConfiguration();
- PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
-
doReturn(mock(PulsarResources.class)).when(pulsar).getPulsarResources();
- });
-
-
- ManagedLedgerFactory mlFactoryMock = mock(ManagedLedgerFactory.class);
- doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory();
-
- ZooKeeper mockZk = createMockZooKeeper();
- OrderedExecutor executor =
OrderedExecutor.newBuilder().numThreads(1).build();
- doReturn(createMockBookKeeper(executor))
- .when(pulsar).getBookKeeperClient();
-
- MetadataStore store = new ZKMetadataStore(mockZk);
-
- doReturn(store).when(pulsar).getLocalMetadataStore();
- doReturn(store).when(pulsar).getConfigurationMetadataStore();
-
- pulsarResources =
spyWithClassAndConstructorArgsRecordingInvocations(PulsarResources.class,
store, store);
- PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
- doReturn(pulsarResources).when(pulsar).getPulsarResources();
- });
- NamespaceResources namespaceResources =
-
spyWithClassAndConstructorArgsRecordingInvocations(NamespaceResources.class,
store, 30);
-
doReturn(namespaceResources).when(pulsarResources).getNamespaceResources();
-
- TenantResources tenantResources =
spyWithClassAndConstructorArgsRecordingInvocations(TenantResources.class,
store, 30);
- doReturn(tenantResources).when(pulsarResources).getTenantResources();
-
-
doReturn(CompletableFuture.completedFuture(Optional.of(TenantInfo.builder().build()))).when(tenantResources)
- .getTenantAsync("public");
-
- brokerService =
spyWithClassAndConstructorArgsRecordingInvocations(BrokerService.class, pulsar,
eventLoopGroup);
- BrokerInterceptor interceptor = mock(BrokerInterceptor.class);
- doReturn(interceptor).when(brokerService).getInterceptor();
- PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
- doReturn(brokerService).when(pulsar).getBrokerService();
- doReturn(executor).when(pulsar).getOrderedExecutor();
- });
+ @AfterMethod(alwaysRun = true)
+ public void cleanup() throws Exception {
+ if (testPulsarServiceFactory != null) {
+ testPulsarServiceFactory.close();
+ testPulsarServiceFactory = null;
+ }
}
@Test
public void testVerifyOriginalPrincipalWithAuthDataForwardedFromProxy()
throws Exception {
- doReturn(true).when(svcConfig).isAuthenticateOriginalAuthData();
+ svcConfig.setAuthenticateOriginalAuthData(true);
+
- ServerCnx serverCnx =
spyWithClassAndConstructorArgsRecordingInvocations(ServerCnx.class, pulsar);
+ ServerCnx serverCnx = testPulsarServiceFactory.createServerCnxSpy();
ChannelHandlerContext channelHandlerContext =
mock(ChannelHandlerContext.class);
Channel channel = mock(Channel.class);
ChannelPipeline channelPipeline = mock(ChannelPipeline.class);
@@ -197,7 +154,8 @@ public class ServerCnxAuthorizationTest {
assertEquals(serverCnx.getAuthState().getAuthRole(), PROXY_PRINCIPAL);
AuthorizationService authorizationService =
-
spyWithClassAndConstructorArgsRecordingInvocations(AuthorizationService.class,
svcConfig, pulsarResources);
+
spyWithClassAndConstructorArgsRecordingInvocations(AuthorizationService.class,
svcConfig,
+ testPulsarServiceFactory.getPulsarResources());
doReturn(authorizationService).when(brokerService).getAuthorizationService();
// lookup
@@ -265,9 +223,9 @@ public class ServerCnxAuthorizationTest {
@Test
public void testVerifyOriginalPrincipalWithoutAuthDataForwardedFromProxy()
throws Exception {
- doReturn(false).when(svcConfig).isAuthenticateOriginalAuthData();
+ svcConfig.setAuthenticateOriginalAuthData(false);
- ServerCnx serverCnx =
spyWithClassAndConstructorArgsRecordingInvocations(ServerCnx.class, pulsar);
+ ServerCnx serverCnx = testPulsarServiceFactory.createServerCnxSpy();
ChannelHandlerContext channelHandlerContext =
mock(ChannelHandlerContext.class);
Channel channel = mock(Channel.class);
ChannelPipeline channelPipeline = mock(ChannelPipeline.class);
@@ -298,7 +256,8 @@ public class ServerCnxAuthorizationTest {
assertEquals(serverCnx.getAuthState().getAuthRole(), PROXY_PRINCIPAL);
AuthorizationService authorizationService =
-
spyWithClassAndConstructorArgsRecordingInvocations(AuthorizationService.class,
svcConfig, pulsarResources);
+
spyWithClassAndConstructorArgsRecordingInvocations(AuthorizationService.class,
svcConfig,
+ testPulsarServiceFactory.getPulsarResources());
doReturn(authorizationService).when(brokerService).getAuthorizationService();
// lookup
@@ -359,7 +318,7 @@ public class ServerCnxAuthorizationTest {
@Test
public void testVerifyAuthRoleAndAuthDataFromDirectConnectionBroker()
throws Exception {
- ServerCnx serverCnx =
spyWithClassAndConstructorArgsRecordingInvocations(ServerCnx.class, pulsar);
+ ServerCnx serverCnx = testPulsarServiceFactory.createServerCnxSpy();
ChannelHandlerContext channelHandlerContext =
mock(ChannelHandlerContext.class);
Channel channel = mock(Channel.class);
@@ -390,7 +349,8 @@ public class ServerCnxAuthorizationTest {
assertEquals(serverCnx.getAuthState().getAuthRole(), CLIENT_PRINCIPAL);
AuthorizationService authorizationService =
-
spyWithClassAndConstructorArgsRecordingInvocations(AuthorizationService.class,
svcConfig, pulsarResources);
+
spyWithClassAndConstructorArgsRecordingInvocations(AuthorizationService.class,
svcConfig,
+ testPulsarServiceFactory.getPulsarResources());
doReturn(authorizationService).when(brokerService).getAuthorizationService();
// lookup
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherFailoverConsumerStreamingDispatcherTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherFailoverConsumerStreamingDispatcherTest.java
index abedc1a6688..00e31c51c9c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherFailoverConsumerStreamingDispatcherTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherFailoverConsumerStreamingDispatcherTest.java
@@ -32,6 +32,6 @@ public class
PersistentDispatcherFailoverConsumerStreamingDispatcherTest extends
@BeforeMethod(alwaysRun = true)
public void setup() throws Exception {
super.setup();
- pulsar.getConfiguration().setStreamingDispatch(true);
+ testPulsarServiceFactory.getConfig().setStreamingDispatch(true);
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicStreamingDispatcherTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicStreamingDispatcherTest.java
index e0740a234e1..d7699e0b817 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicStreamingDispatcherTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicStreamingDispatcherTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service.persistent;
+import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.PersistentTopicTest;
import org.apache.pulsar.broker.service.streamingdispatch.StreamingDispatcher;
import org.testng.annotations.BeforeMethod;
@@ -32,8 +33,9 @@ public class PersistentTopicStreamingDispatcherTest extends
PersistentTopicTest
@BeforeMethod(alwaysRun = true)
public void setup() throws Exception {
super.setup();
- pulsar.getConfiguration().setTopicLevelPoliciesEnabled(false);
- pulsar.getConfiguration().setSystemTopicEnabled(false);
- pulsar.getConfiguration().setStreamingDispatch(true);
+ ServiceConfiguration config = testPulsarServiceFactory.getConfig();
+ config.setTopicLevelPoliciesEnabled(false);
+ config.setSystemTopicEnabled(false);
+ config.setStreamingDispatch(true);
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregatorTest.java
index 9bd0b1f1bd4..e63f644f3d0 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregatorTest.java
@@ -25,7 +25,6 @@ import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl;
import org.apache.bookkeeper.mledger.util.StatsBuckets;
import org.apache.pulsar.broker.PulsarService;
-import org.apache.pulsar.broker.PulsarServiceMockSupport;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Consumer;
@@ -60,10 +59,8 @@ public class NamespaceStatsAggregatorTest {
doReturn(multiLayerTopicsMap).when(broker).getMultiLayerTopicMap();
Mockito.when(pulsar.getLocalMetadataStore()).thenReturn(Mockito.mock(ZKMetadataStore.class));
ServiceConfiguration mockConfig =
Mockito.mock(ServiceConfiguration.class);
- PulsarServiceMockSupport.mockPulsarServiceProps(pulsar, () -> {
- doReturn(mockConfig).when(pulsar).getConfiguration();
- doReturn(broker).when(pulsar).getBrokerService();
- });
+ doReturn(mockConfig).when(pulsar).getConfiguration();
+ doReturn(broker).when(pulsar).getBrokerService();
}
@Test