This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ae0fc5bdcae [fix][test] Fix resource leak in PulsarTestContext (#20799)
ae0fc5bdcae is described below
commit ae0fc5bdcae0220da4936d0fdaff71eead219cdd
Author: Lari Hotari <[email protected]>
AuthorDate: Fri Jul 14 00:01:22 2023 +0300
[fix][test] Fix resource leak in PulsarTestContext (#20799)
---
.../org/apache/pulsar/broker/PulsarService.java | 6 +-
.../pulsar/broker/service/PersistentTopicTest.java | 5 +-
.../testcontext/AbstractTestPulsarService.java | 53 ++++++++-------
.../testcontext/NonStartableTestPulsarService.java | 79 ++++------------------
4 files changed, 49 insertions(+), 94 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 5cf3c47bcb0..40c5a2d6528 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -784,7 +784,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
exposeTopicMetrics, offloaderScheduler, interval);
this.defaultOffloader =
createManagedLedgerOffloader(defaultOffloadPolicies);
- this.brokerInterceptor = BrokerInterceptors.load(config);
+ setBrokerInterceptor(newBrokerInterceptor());
// use getter to support mocking getBrokerInterceptor method in
tests
BrokerInterceptor interceptor = getBrokerInterceptor();
if (interceptor != null) {
@@ -927,6 +927,10 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
}
}
+ protected BrokerInterceptor newBrokerInterceptor() throws IOException {
+ return BrokerInterceptors.load(config);
+ }
+
@VisibleForTesting
protected OrderedExecutor newOrderedExecutor() {
return OrderedExecutor.newBuilder()
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index fefed1aaa0a..c49df3e85ce 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -131,6 +131,7 @@ import
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.compaction.CompactedTopic;
import org.apache.pulsar.compaction.CompactedTopicContext;
import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.compaction.CompactorMXBean;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
import org.awaitility.Awaitility;
@@ -172,11 +173,13 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
svcConfig.setClusterName("pulsar-cluster");
svcConfig.setTopicLevelPoliciesEnabled(false);
svcConfig.setSystemTopicEnabled(false);
+ Compactor compactor = mock(Compactor.class);
+ when(compactor.getStats()).thenReturn(mock(CompactorMXBean.class));
pulsarTestContext = PulsarTestContext.builderForNonStartableContext()
.config(svcConfig)
.spyByDefault()
.useTestPulsarResources(metadataStore)
- .compactor(mock(Compactor.class))
+ .compactor(compactor)
.build();
brokerService = pulsarTestContext.getBrokerService();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java
index a6861268b94..517d57d0042 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/AbstractTestPulsarService.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.testcontext;
+import java.io.IOException;
import org.apache.pulsar.broker.BookKeeperClientFactory;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
@@ -37,11 +38,7 @@ import
org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
*/
abstract class AbstractTestPulsarService extends PulsarService {
protected final SpyConfig spyConfig;
- protected final MetadataStoreExtended localMetadataStore;
- protected final MetadataStoreExtended configurationMetadataStore;
- protected final Compactor compactor;
- protected final BrokerInterceptor brokerInterceptor;
- protected final BookKeeperClientFactory bookKeeperClientFactory;
+ private boolean compactorExists;
public AbstractTestPulsarService(SpyConfig spyConfig, ServiceConfiguration
config,
MetadataStoreExtended localMetadataStore,
@@ -50,53 +47,59 @@ abstract class AbstractTestPulsarService extends
PulsarService {
BookKeeperClientFactory
bookKeeperClientFactory) {
super(config);
this.spyConfig = spyConfig;
- this.localMetadataStore =
-
NonClosingProxyHandler.createNonClosingProxy(localMetadataStore,
MetadataStoreExtended.class);
- this.configurationMetadataStore =
-
NonClosingProxyHandler.createNonClosingProxy(configurationMetadataStore,
MetadataStoreExtended.class);
- this.compactor = compactor;
- this.brokerInterceptor = brokerInterceptor;
- this.bookKeeperClientFactory = bookKeeperClientFactory;
+ setLocalMetadataStore(
+
NonClosingProxyHandler.createNonClosingProxy(localMetadataStore,
MetadataStoreExtended.class));
+ setConfigurationMetadataStore(
+
NonClosingProxyHandler.createNonClosingProxy(configurationMetadataStore,
MetadataStoreExtended.class));
+ setCompactor(compactor);
+ setBrokerInterceptor(brokerInterceptor);
+ setBkClientFactory(bookKeeperClientFactory);
}
@Override
public MetadataStore
createConfigurationMetadataStore(PulsarMetadataEventSynchronizer synchronizer)
throws MetadataStoreException {
if (synchronizer != null) {
-
synchronizer.registerSyncListener(configurationMetadataStore::handleMetadataEvent);
+ synchronizer.registerSyncListener(
+ ((MetadataStoreExtended)
getConfigurationMetadataStore())::handleMetadataEvent);
}
- return configurationMetadataStore;
+ return getConfigurationMetadataStore();
}
@Override
public MetadataStoreExtended
createLocalMetadataStore(PulsarMetadataEventSynchronizer synchronizer)
throws MetadataStoreException, PulsarServerException {
if (synchronizer != null) {
-
synchronizer.registerSyncListener(localMetadataStore::handleMetadataEvent);
+ synchronizer.registerSyncListener(
+ getLocalMetadataStore()::handleMetadataEvent);
}
- return localMetadataStore;
+ return getLocalMetadataStore();
}
@Override
- public Compactor newCompactor() throws PulsarServerException {
+ protected void setCompactor(Compactor compactor) {
if (compactor != null) {
- return compactor;
- } else {
- return spyConfig.getCompactor().spy(super.newCompactor());
+ compactorExists = true;
}
+ super.setCompactor(compactor);
}
@Override
- public BrokerInterceptor getBrokerInterceptor() {
- if (brokerInterceptor != null) {
- return brokerInterceptor;
+ public Compactor newCompactor() throws PulsarServerException {
+ if (compactorExists) {
+ return getCompactor();
} else {
- return super.getBrokerInterceptor();
+ return spyConfig.getCompactor().spy(super.newCompactor());
}
}
@Override
public BookKeeperClientFactory newBookKeeperClientFactory() {
- return bookKeeperClientFactory;
+ return getBkClientFactory();
+ }
+
+ @Override
+ protected BrokerInterceptor newBrokerInterceptor() throws IOException {
+ return getBrokerInterceptor() != null ? getBrokerInterceptor() :
super.newBrokerInterceptor();
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java
index 13c4d7d72af..af365ed3193 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java
@@ -38,11 +38,9 @@ import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.resources.TopicResources;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.schema.DefaultSchemaRegistryService;
-import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
import
org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
-import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
@@ -56,13 +54,6 @@ import
org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
* for a "non-startable" PulsarService. Please see {@link PulsarTestContext}
for more details.
*/
class NonStartableTestPulsarService extends AbstractTestPulsarService {
- private final PulsarResources pulsarResources;
- private final ManagedLedgerStorage managedLedgerClientFactory;
- private final BrokerService brokerService;
-
- private final SchemaRegistryService schemaRegistryService;
-
- private final PulsarClientImpl pulsarClient;
private final NamespaceService namespaceService;
@@ -76,16 +67,16 @@ class NonStartableTestPulsarService extends
AbstractTestPulsarService {
Function<BrokerService,
BrokerService> brokerServiceCustomizer) {
super(spyConfig, config, localMetadataStore,
configurationMetadataStore, compactor, brokerInterceptor,
bookKeeperClientFactory);
- this.pulsarResources = pulsarResources;
- this.managedLedgerClientFactory = managedLedgerClientFactory;
+ setPulsarResources(pulsarResources);
+ setManagedLedgerClientFactory(managedLedgerClientFactory);
try {
- this.brokerService = brokerServiceCustomizer.apply(
- spyConfig.getBrokerService().spy(TestBrokerService.class,
this, getIoEventLoopGroup()));
+ setBrokerService(brokerServiceCustomizer.apply(
+ spyConfig.getBrokerService().spy(TestBrokerService.class,
this, getIoEventLoopGroup())));
} catch (Exception e) {
throw new RuntimeException(e);
}
- this.schemaRegistryService =
spyWithClassAndConstructorArgs(DefaultSchemaRegistryService.class);
- this.pulsarClient = mock(PulsarClientImpl.class);
+
setSchemaRegistryService(spyWithClassAndConstructorArgs(DefaultSchemaRegistryService.class));
+ setClient(mock(PulsarClientImpl.class));
this.namespaceService = mock(NamespaceService.class);
try {
startNamespaceService();
@@ -118,63 +109,17 @@ class NonStartableTestPulsarService extends
AbstractTestPulsarService {
return () -> namespaceService;
}
- @Override
- public synchronized PulsarClient getClient() throws PulsarServerException {
- return pulsarClient;
- }
-
@Override
public PulsarClientImpl createClientImpl(ClientConfigurationData
clientConf) throws PulsarClientException {
- return pulsarClient;
- }
-
- @Override
- public SchemaRegistryService getSchemaRegistryService() {
- return schemaRegistryService;
- }
-
- @Override
- public PulsarResources getPulsarResources() {
- return pulsarResources;
- }
-
- public BrokerService getBrokerService() {
- return brokerService;
- }
-
- @Override
- public MetadataStore getConfigurationMetadataStore() {
- return configurationMetadataStore;
- }
-
- @Override
- public MetadataStoreExtended getLocalMetadataStore() {
- return localMetadataStore;
- }
-
- @Override
- public ManagedLedgerStorage getManagedLedgerClientFactory() {
- return managedLedgerClientFactory;
- }
-
- @Override
- protected PulsarResources newPulsarResources() {
- return pulsarResources;
- }
-
- @Override
- protected ManagedLedgerStorage newManagedLedgerClientFactory() throws
Exception {
- return managedLedgerClientFactory;
+ try {
+ return (PulsarClientImpl) getClient();
+ } catch (PulsarServerException e) {
+ throw new PulsarClientException(e);
+ }
}
-
@Override
protected BrokerService newBrokerService(PulsarService pulsar) throws
Exception {
- return brokerService;
- }
-
- @Override
- public BookKeeperClientFactory getBookKeeperClientFactory() {
- return bookKeeperClientFactory;
+ return getBrokerService();
}
static class TestBrokerService extends BrokerService {