This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit fbc92a281c36b88d97a82cd7a6a4a57a71a040f6 Author: Lari Hotari <[email protected]> AuthorDate: Fri Jul 14 00:01:22 2023 +0300 [fix][test] Fix resource leak in PulsarTestContext (#20799) (cherry picked from commit ae0fc5bdcae0220da4936d0fdaff71eead219cdd) --- .../org/apache/pulsar/broker/PulsarService.java | 6 +- .../pulsar/broker/service/PersistentTopicTest.java | 5 +- .../testcontext/AbstractTestPulsarService.java | 53 ++++++++------- .../testcontext/NonStartableTestPulsarService.java | 79 ++++------------------ 4 files changed, 49 insertions(+), 94 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index cf8ab33fd7e..6bb477cf037 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -787,7 +787,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { exposeTopicMetrics, offloaderScheduler, interval); this.defaultOffloader = createManagedLedgerOffloader(defaultOffloadPolicies); - this.brokerInterceptor = BrokerInterceptors.load(config); + setBrokerInterceptor(newBrokerInterceptor()); // use getter to support mocking getBrokerInterceptor method in tests BrokerInterceptor interceptor = getBrokerInterceptor(); if (interceptor != null) { @@ -930,6 +930,10 @@ public class PulsarService implements AutoCloseable, ShutdownService { } } + protected BrokerInterceptor newBrokerInterceptor() throws IOException { + return BrokerInterceptors.load(config); + } + @VisibleForTesting protected OrderedExecutor newOrderedExecutor() { return OrderedExecutor.newBuilder() diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 45ef58bb703..02b10dd09a2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -131,6 +131,7 @@ import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.compaction.CompactedTopic; import org.apache.pulsar.compaction.CompactedTopicContext; import org.apache.pulsar.compaction.Compactor; +import org.apache.pulsar.compaction.CompactorMXBean; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore; import org.awaitility.Awaitility; @@ -172,11 +173,13 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase { svcConfig.setClusterName("pulsar-cluster"); svcConfig.setTopicLevelPoliciesEnabled(false); svcConfig.setSystemTopicEnabled(false); + Compactor compactor = mock(Compactor.class); + when(compactor.getStats()).thenReturn(mock(CompactorMXBean.class)); pulsarTestContext = PulsarTestContext.builderForNonStartableContext() .config(svcConfig) .spyByDefault() .useTestPulsarResources(metadataStore) - .compactor(mock(Compactor.class)) + .compactor(compactor) .build(); brokerService = pulsarTestContext.getBrokerService(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java index a6861268b94..517d57d0042 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.testcontext; +import java.io.IOException; import org.apache.pulsar.broker.BookKeeperClientFactory; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; @@ -37,11 +38,7 @@ import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; */ abstract class AbstractTestPulsarService extends PulsarService { protected final SpyConfig spyConfig; - protected final MetadataStoreExtended localMetadataStore; - protected final MetadataStoreExtended configurationMetadataStore; - protected final Compactor compactor; - protected final BrokerInterceptor brokerInterceptor; - protected final BookKeeperClientFactory bookKeeperClientFactory; + private boolean compactorExists; public AbstractTestPulsarService(SpyConfig spyConfig, ServiceConfiguration config, MetadataStoreExtended localMetadataStore, @@ -50,53 +47,59 @@ abstract class AbstractTestPulsarService extends PulsarService { BookKeeperClientFactory bookKeeperClientFactory) { super(config); this.spyConfig = spyConfig; - this.localMetadataStore = - NonClosingProxyHandler.createNonClosingProxy(localMetadataStore, MetadataStoreExtended.class); - this.configurationMetadataStore = - NonClosingProxyHandler.createNonClosingProxy(configurationMetadataStore, MetadataStoreExtended.class); - this.compactor = compactor; - this.brokerInterceptor = brokerInterceptor; - this.bookKeeperClientFactory = bookKeeperClientFactory; + setLocalMetadataStore( + NonClosingProxyHandler.createNonClosingProxy(localMetadataStore, MetadataStoreExtended.class)); + setConfigurationMetadataStore( + NonClosingProxyHandler.createNonClosingProxy(configurationMetadataStore, MetadataStoreExtended.class)); + setCompactor(compactor); + setBrokerInterceptor(brokerInterceptor); + setBkClientFactory(bookKeeperClientFactory); } @Override public MetadataStore createConfigurationMetadataStore(PulsarMetadataEventSynchronizer synchronizer) throws MetadataStoreException { if (synchronizer != null) { - synchronizer.registerSyncListener(configurationMetadataStore::handleMetadataEvent); + synchronizer.registerSyncListener( + ((MetadataStoreExtended) getConfigurationMetadataStore())::handleMetadataEvent); } - return configurationMetadataStore; + return getConfigurationMetadataStore(); } @Override public MetadataStoreExtended createLocalMetadataStore(PulsarMetadataEventSynchronizer synchronizer) throws MetadataStoreException, PulsarServerException { if (synchronizer != null) { - synchronizer.registerSyncListener(localMetadataStore::handleMetadataEvent); + synchronizer.registerSyncListener( + getLocalMetadataStore()::handleMetadataEvent); } - return localMetadataStore; + return getLocalMetadataStore(); } @Override - public Compactor newCompactor() throws PulsarServerException { + protected void setCompactor(Compactor compactor) { if (compactor != null) { - return compactor; - } else { - return spyConfig.getCompactor().spy(super.newCompactor()); + compactorExists = true; } + super.setCompactor(compactor); } @Override - public BrokerInterceptor getBrokerInterceptor() { - if (brokerInterceptor != null) { - return brokerInterceptor; + public Compactor newCompactor() throws PulsarServerException { + if (compactorExists) { + return getCompactor(); } else { - return super.getBrokerInterceptor(); + return spyConfig.getCompactor().spy(super.newCompactor()); } } @Override public BookKeeperClientFactory newBookKeeperClientFactory() { - return bookKeeperClientFactory; + return getBkClientFactory(); + } + + @Override + protected BrokerInterceptor newBrokerInterceptor() throws IOException { + return getBrokerInterceptor() != null ? getBrokerInterceptor() : super.newBrokerInterceptor(); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java index 13c4d7d72af..af365ed3193 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java @@ -38,11 +38,9 @@ import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.broker.resources.TopicResources; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.schema.DefaultSchemaRegistryService; -import org.apache.pulsar.broker.service.schema.SchemaRegistryService; import org.apache.pulsar.broker.storage.ManagedLedgerStorage; import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider; import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider; -import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; @@ -56,13 +54,6 @@ import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; * for a "non-startable" PulsarService. Please see {@link PulsarTestContext} for more details. */ class NonStartableTestPulsarService extends AbstractTestPulsarService { - private final PulsarResources pulsarResources; - private final ManagedLedgerStorage managedLedgerClientFactory; - private final BrokerService brokerService; - - private final SchemaRegistryService schemaRegistryService; - - private final PulsarClientImpl pulsarClient; private final NamespaceService namespaceService; @@ -76,16 +67,16 @@ class NonStartableTestPulsarService extends AbstractTestPulsarService { Function<BrokerService, BrokerService> brokerServiceCustomizer) { super(spyConfig, config, localMetadataStore, configurationMetadataStore, compactor, brokerInterceptor, bookKeeperClientFactory); - this.pulsarResources = pulsarResources; - this.managedLedgerClientFactory = managedLedgerClientFactory; + setPulsarResources(pulsarResources); + setManagedLedgerClientFactory(managedLedgerClientFactory); try { - this.brokerService = brokerServiceCustomizer.apply( - spyConfig.getBrokerService().spy(TestBrokerService.class, this, getIoEventLoopGroup())); + setBrokerService(brokerServiceCustomizer.apply( + spyConfig.getBrokerService().spy(TestBrokerService.class, this, getIoEventLoopGroup()))); } catch (Exception e) { throw new RuntimeException(e); } - this.schemaRegistryService = spyWithClassAndConstructorArgs(DefaultSchemaRegistryService.class); - this.pulsarClient = mock(PulsarClientImpl.class); + setSchemaRegistryService(spyWithClassAndConstructorArgs(DefaultSchemaRegistryService.class)); + setClient(mock(PulsarClientImpl.class)); this.namespaceService = mock(NamespaceService.class); try { startNamespaceService(); @@ -118,63 +109,17 @@ class NonStartableTestPulsarService extends AbstractTestPulsarService { return () -> namespaceService; } - @Override - public synchronized PulsarClient getClient() throws PulsarServerException { - return pulsarClient; - } - @Override public PulsarClientImpl createClientImpl(ClientConfigurationData clientConf) throws PulsarClientException { - return pulsarClient; - } - - @Override - public SchemaRegistryService getSchemaRegistryService() { - return schemaRegistryService; - } - - @Override - public PulsarResources getPulsarResources() { - return pulsarResources; - } - - public BrokerService getBrokerService() { - return brokerService; - } - - @Override - public MetadataStore getConfigurationMetadataStore() { - return configurationMetadataStore; - } - - @Override - public MetadataStoreExtended getLocalMetadataStore() { - return localMetadataStore; - } - - @Override - public ManagedLedgerStorage getManagedLedgerClientFactory() { - return managedLedgerClientFactory; - } - - @Override - protected PulsarResources newPulsarResources() { - return pulsarResources; - } - - @Override - protected ManagedLedgerStorage newManagedLedgerClientFactory() throws Exception { - return managedLedgerClientFactory; + try { + return (PulsarClientImpl) getClient(); + } catch (PulsarServerException e) { + throw new PulsarClientException(e); + } } - @Override protected BrokerService newBrokerService(PulsarService pulsar) throws Exception { - return brokerService; - } - - @Override - public BookKeeperClientFactory getBookKeeperClientFactory() { - return bookKeeperClientFactory; + return getBrokerService(); } static class TestBrokerService extends BrokerService {
